Integrating JBoss Fuse ESB with Active Directory

As of the time of this writing, I could not find a documented recipe for using Active Directory as the authentication and authorization backend of JBoss Fuse ESB.  Here’s a link to the official documentation on Enabling LDAP Authentication. It describes how to integrate with Apache Directory Server, which has some key differences from Microsoft Active Directory.

The process to use Active Directory is actually rather simple, if you know what to do.

We will make these assumptions for this excersise:

  • Domain Name: fqdn.local
  • The OU where the esb’s groups will be found is: ou=users,dc=fqdn,dc=local (The default location for groups in AD)

First lets create a new XML File to represent our OSGI Blueprint module.

Step 1: Create file ldap-module.xml

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
  xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
    xmlns:jaas="http://karaf.apache.org/xmlns/jaas/v1.0.0">
  
  <jaas:config name="karaf" rank="9">
    <jaas:module className="org.apache.karaf.jaas.modules.ldap.LDAPLoginModule"
                     flags="required">
      initialContextFactory = com.sun.jndi.ldap.LdapCtxFactory
      connection.username = my_service_account
      connection.password = *********
      connection.url = ldap://domaincontroller.fqdn.local:389
      user.filter = (samAccountName=%u)
      user.base.dn = dc=fqdn,dc=local
      user.search.subtree = true
      role.name.attribute = cn
      role.filter = (member=%dn,dc=fqdn,dc=local)
      role.base.dn = ou=users,dc=fqdn,dc=local
      role.search.subtree = true
      authentication = simple
      debug=true
    </jaas:module>
  </jaas:config>
  
</blueprint>

The above file defines a Jaas Module that creates an instance of the built in org.apache.karaf.jaas.modules.ldap.LDAPLoginModule. Its configuration is the key to successfully integrating with AD. We’ll take a look at some of the properties now.

First the connection.username and connection.password are going to be your service account that the ESB will use to do the LDAP lookups. While not shown here I do recommend that you externalize and encrypt the configuration for these using the Config Admin Service.

Next see that the user.filter property is set to use the samAccountName attribute to lookup users by username in AD. We set the user.base.dn to the top of our AD Forrest with the value dc=fqdn,dc=local. You can constrain which users are able to login by ANDing an additional LDAP predicate on to the user.filter that constrains the user to also be a member of some “ESB Users” group, etc.

Example:

user.filter = (&amp;(samAccountName=%u)(memberof=cn=ESB\ Users,cn=users,dc=fqdn,dc=local))

This takes care of authentication, but does not allow for authorization. This is where the role related attributes come in. We set the role.name.attribute to be the “cn” (Common Name). In Active Directory this corresponds to the actual group name.

Next note that we defined a role.filter. This is very important to get right. Our’s specifies an LDAP query that finds groups to which the authenticated user belongs. See that in the query (member=%dn,dc=fqdn,dc=local), the member attribute must contain an entry for the user’s fully qualified “dn” (Distinguished Name). Notice the variable in the member=%dn. Fuse will replace this variable with the relative dn of the user being authorized. See in my configuration that I add the remaining suffix part “dc=fqdn,dc=local” of the Full Distinguished Name. YOU MUST DO THIS OR THE FILTER WILL NOT WORK AT ALL. This allows JBoss Fuse to find the groups for AD a user.

Lastly, you should be aware that since we specified a role.base.dn of OU=USERS,dc=fqdn,dc=local, the groups used by the ESB must exist under OU=USERS or below.

Step 2: Deploy file ldap-module.xml

You can deploy this file to AD Enable your instance of JBoss Fuse by simply copying this file to the $FUSE_HOME/deploy directory.

With that, you’re rocking. You are now using Active Directory as the User backend of your ESB.

Step 3: Adjust the karaf.admin.role in system.properties

One thing that you’ll want to do at this point is be able to login to Karaf and the Management Web Console with AD Users. To enable this, just edit the file $FUSE_HOME/etc/system.properties and set the property karaf.admin.role to a group name in the AD.

Example:

karaf.admin.role="ESB Administrators"

With a setup like this, only authorized users will be able to login to the management tools.

That’s all there is to it.

Defining Log4j MDC Fields Declaratively With Spring

In this post, I’m going to show you how to extend the fields Log4j captures via MDC (Mapped Diagnostic Contexts). Specifically, we will add an additional field called “ApplicationId” that will identify the application that a given log entry came from. This is useful when you have many applications logging to a single persistence mechanism. It enables you to filter/query the logs to include/exclude entries based on this value. For enterprises, who require both wide and deep insight into their instrumentation feeds, the ability to cross-refer and cross-correlate this data across systems and applications is essential. The means to seperate out a particular log stream from the noise of the others can easily be achieved with the recipe that we’ll look at next.

First in your Log4j configuration file, you need setup your ConversionPattern value to include a new ApplicationId field:


log4j.appender.out.layout.ConversionPattern=%X{ApplicationId} | %d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n

Once this is in place, you can now begin capturing it by instructing Log4j to add a new Key:Value into its awareness of fields to be tracked.

You can set this value in your java code like this:

 
private void setupLogging() {
	org.apache.log4j.MDC.put("ApplicationId", "XZY321");
}

That is how you can do it programmatically. Code like the setupLogging() function should be run only once in an initialization routine, such as a bootstrapping function. After that it will automatically place the ApplicationId field into every log entry it generates. This is nice, as it will now identify the application that made each log entry.

If, however, you do not want to write java code to achieve this, you can still do this with Spring Xml Config. In this way, you can get the same result with more of a configuration idiom, rather than a coding approach.

Here’s how to run a static method using Spring’s MethodInvokingFactoryBean to add a MDC Property to log4j’s tracked fields:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
       
      <bean
      	class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
      	<property name="targetClass">
      		<value>org.apache.log4j.MDC</value>
      	</property>
      	<property name="targetMethod">
      		<value>put</value>
      	</property>
      	<property name="arguments">
      		<list>
      			<value>ApplicationId</value>
      			<value>ABC123</value>
      		</list>
      	</property>
      </bean>

</beans>

When your Spring ApplicationContext loads, it will run the static org.apache.log4j.MDC.put() method. In effect, this is similar to running the code programmatically, but in practice it can be considered an application configuration-level feature. I tend to like this approach better, because it does not require a compile and redeploy, which makes it manageable by general tech ops people. Which ever approach taken, extending Log4j to capture the additional relevant data points for your needs can be a very powerful compliment.

Surfing the ReferencePipeline in Java 8

Java 8 includes new a Stream Processing API. At its core is the ReferencePipeline class which gives us a DSL for working with Streams in a functional style. You can get an instance of a ReferencePipeline flowing with a single expression.


IntStream.range(1, 50)
         .mapToObj(i -> new Thread(() -> System.out.println(i)))
         .forEach(thread -> thread.start());

The MapReduce DSL has the essential set of list processing operations such as querying, mapping, and iterating. The operations can be chained to provide the notion of linked inline strategies. When the stream of data is pulled through this pipeline, each data element passes through the operation chain.

Streams of data can be folded or reduced to a single value.

For example, here is how you can query a stream and accumulate the matches into a single value:


String value = Stream.of("foo", "bar", "baz", "quux")
		             .filter(s -> s.contains("a") || s.endsWith("x"))
		             .map(s -> s.toUpperCase())
		             .reduce((acc, s) -> acc + s);

Pipeline Flow For Code Sample
Pipeline Anatomy of Code Sample

These functions are monadic operations that enable us to query, transform, and project objects down the pipeline. The ReferencePipeline contains these functions. It is constructed to return copies of itself after each method invokation, giving us a chainable API. This API can be considered to be an architectural scaffolding for describing how to process Streams of data.

See here how a pipeline can take in lines of CSV and emit structured row objects in the form of Arrays:


//start with a stream

String csv = "a,b,c\n"
		   + "d,e,f\n"
		   + "g,h,i\n";

//process the stream

Stream<String[]> rows = new BufferedReader(
		new InputStreamReader(new ByteArrayInputStream(
				csv.getBytes("UTF-8"))))
		.lines()
		.skip(1)
		.map(line -> line.split(","));

//print it out

rows.forEach(row -> System.out.println(row[0]
									 + row[1]
									 + row[2]));

Notice that the stream processing routine is designed as a single expression. The .lines() method initiates the pipeline. Then we skip the headers (they are not data), and project out an array of fields with the .map(). This is nice. We’re able to use a higher-order thought process when describing the algorithm to the JVM. Instead of diving into procedural control-flow, we simply tell the system what we want it to do by using a step-wise functional prescription. This style of programming leads to more readable and comprehensible code, and as you would expect, behind the scenes, the compiler converts the lambda syntax into Java classes, aka it gets “de-sugared” into Predicates (filters), Comparitors (sorters), BiFunctions (mappers), and Functions (accumulators). The lambda expressions make it so we do not to have to get our hands dirty with the details of Java’s functional programming object model.

Consider the following example.

I want to download historical stock pricing data from Yahoo and turn it into PricingRecord objects in my system.

Yahoo Finance API

It’s data can be acquired with a simple HTTP Get call:

http://real-chart.finance.yahoo.com/table.csv?s=AMD&d=6&e=18&f=2014&g=d&a=7&b=9&c=1996&ignore=.csv

First note the shape of the CSV that Yahoo’s API returns:

Date Open High Low Close Volume Adj Close*
2008-12-29 15.98 16.16 15.98 16.08 158600 16.08

Our CSV looks like this:

2008-12-29,15.98,16.16,15.98,16.08,158600,16.08

Let’s compose a simple recipe that uses the Stream API to pull this data from the web and turn it into objects that we can work with in our system.

Our program should do these things:

  1. Take in a set of stock symbols in the form of a Stream strings.
  2. Transform the Stream of symbols into a new Stream containing Streams of PricingRecords.
    • This will be done by Making a remote call to Yahoo’s API.
    • The CSV returned should be mapped directly into PricingRecords objects.
  3. Since we’re pulling the data for multiple symbols, we should do the API calls for each concurrently. We can achieve this by parallelizing the flow of stream elements through the operation chain.

Here is the solution implemented as single composite expression. Note how we aquire a Stream<String>, process it, and emit a Map<String,List<PricingRecord>>.

Using a ReferencePipeline as a builder:


//start with a stream

Stream<String> stockStream = Stream.of("AMD", "APL", "IBM", "GOOG");

//generate data with a stream processing algorithm

Map<String, List<PricingRecord>> pricingRecords = stockStream
		.parallel()
		.map(symbol -> {

			try {
				String csv = new JdkRequest(String.format("http://real-chart.finance.yahoo.com/table.csv?s=%s&d=6&e=18&f=2014&g=d&a=7&b=9&c=1996&ignore=.csv", symbol))
							 .header("Accept", "text/xml")
						     .fetch()
						     .body();

				return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(csv.getBytes("UTF-8"))))
					   .lines()
					   .skip(1)
					   .map(line -> line.split(","))
					   .map(arr -> new PricingRecord(symbol, arr[0],arr[1], arr[2], arr[3], arr[4], arr[5], arr[6]));

			} catch (Exception e) {
				e.printStackTrace();
			}

			return symbol;
		})
		.flatMap(records -> (Stream<PricingRecord>) records)
		.collect(Collectors.groupingBy(PricingRecord::getSymbol));


//print it out..

pricingRecords.forEach((symbol, records) -> System.out.println(String
		.format("Symbol: %s\n%s",
				symbol,
				records.stream()
					   .parallel()
					   .map(record -> record.toString())
					   .reduce((x, y) -> x + y))));

The elegance of this solution may not at first be apparent, but note a few key characteristics that emerge from a closer look. Notice that we get concurrency for free with the .parallel(). We do this near the beginning of the pipeline, in order to feed the downstream .map() function in a multithreaded manner.

Notice also that we’re projecting a 2-dimensional Stream out of the .map() function. The top-level stream contains a substream of Stream objects. The composite type it returns is actually a Stream<Stream<PricingRecord>>. This is a common scenario in stream-based programming and the solution to unpack and harvest the substream is to use the .flatMap() function. It provides the affordance we need for working with structure in a 2-dimensional stream. Note that the ReferencePipeline also provides us with a .substream(n) function for working with n-dimensional streams. In my example, we use .flatMap() to unpack and return a cast over the elements to coerce them into PricingRecord objects.

Finally, look at the last expression in the chain the .collect(). To collect the stream is to terminate it, which means to enumerate its full contents. Basically this means to load them into memory, however there are many ways in which you might want to do this. For this we have what are called Collectors; they allow us to describe how we want the contents of the stream organized when they are pulled out.

Usage Tip:

If you want a flat list use:
Collectors.toList // ArrayList

If you want a map or dictionary-like structure, use:
Collectors.groupingBy // Map

The .groupingBy() function that I use above allows us to aggregate our stream into groups based on a common characteristic. The Maps that .groupingBy() projects are very useful because you can represent both the input to the function and its output as a “pair”, e.g. Map.SimpleEntry (key value pair).

For completeness I should provide the PricingRecord class:

public class PricingRecord {
	private String symbol;
	private String date;
	private double open;
	private double high;
	private double low;
	private double close;
	private int volume;
	private double adjustedClose;

	public PricingRecord (String symbol, String date, String open, String high, String low, String close, String volume, String adjustedClose){
		this.setSymbol(symbol);
		this.date = date;
		this.open = Double.parseDouble(open);
		this.high = Double.parseDouble(high);
		this.low = Double.parseDouble(low);
		this.close = Double.parseDouble(close);
		this.volume = Integer.parseInt(volume);
		this.adjustedClose = Double.parseDouble(adjustedClose);
	}

	public String toString(){
		return String.format("symbol=%s,date=%s,open=%.2f,close=%.2f,high=%.2f,low=%.2f,volume=%s,adjustedClose=%.2f", this.symbol, this.date, this.open, this.close, this.high, this.low, this.volume, this.adjustedClose);

	}

	public String getDate() {
		return date;
	}
	public void setDate(String date) {
		this.date = date;
	}
	public double getOpen() {
		return open;
	}
	public void setOpen(double open) {
		this.open = open;
	}
	public double getHigh() {
		return high;
	}
	public void setHigh(double high) {
		this.high = high;
	}
	public double getLow() {
		return low;
	}
	public void setLow(double low) {
		this.low = low;
	}
	public double getClose() {
		return close;
	}
	public void setClose(double close) {
		this.close = close;
	}
	public int getVolume() {
		return volume;
	}
	public void setVolume(int volume) {
		this.volume = volume;
	}
	public double getAdjustedClose() {
		return adjustedClose;
	}
	public void setAdjustedClose(double adjustedClose) {
		this.adjustedClose = adjustedClose;
	}
	public String getSymbol() {
		return symbol;
	}
	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

}

This is a simple entity class. It just serves to represent our logical notion of a PricingRecord, however I want you to notice the .toString(). It simply prints out the object, but notice in the last expression of the code example how we’re able to print a concatenation of these objects out to the console as a String. The .reduce() function allows us to accumulate the result of each symbol’s data and print it out in a logically separated and intelligible way. Reducers are what I like to think of as “distillers” of information in the streams we process. In this way, they can be made to aggregate and refine information from the stream as it passes through the pipeline.

Finally, in order to run the code example as is, you’ll need to pull in the jcabi-http library to get the nice fluent web api that I’m using. Add this to your pom.xml and resolve the imports.

<dependency>
	<groupId>com.jcabi</groupId>
	<artifactId>jcabi-http</artifactId>
	<version>1.8</version>
</dependency>

The introduction of this functional programming model into Java, is a leap forward for the platform. It signals a shift in not only the way we write the code in the language, but also how we think about solving problems with it. So called higher-order problem solving requires higher-order tools. This technology is compelling because it give us these high-level tools and a fun syntax that allows us to focus on problem to be solved, not on the cruft it takes to solve it. This makes life better for everyone..

Grokking JBoss Fuse Logs with Logstash

JBoss Fuse or more generally Apache ServiceMix ship with a default log4j layout ConversionPattern. In this article I will show you how to parse your $FUSE_HOME/data/log/fuse.log file, collect its log entries into Elasticsearch, and understand whats going on in the Kibana UI.

First a few pieces of context.

If you are not familiar with the ELK Stack, please read up on it here.

In this scenario, we are going to use Logstash as our log parser and collector agent. Elasticsearch (ES) will provide the storage and RESTful search interface. And, Kibana will be our UI over the data in ES.

First we need to get the data. In our case the target we want to analyze in the JBoss Fuse log file. Since, my $FUSE_HOME is /opt/jboss-fuse, my log file will be at /opt/jboss-fuse/data/log/fuse.log.

2014-05-25 21:11:48,677 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding destination: Topic:ActiveMQ.Advisory.Connection
2014-05-25 21:11:48,743 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding Consumer: ConsumerInfo {commandId = 2, responseRequired = true, consumerId = ID:ESBHOST-50593-635366336896487074-1:56:-1:1, destination = ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = false, selector = null, clientId = null, subscriptionName = null, noLocal = true, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}
2014-05-25 21:11:48,804 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding Session: SessionInfo {commandId = 3, responseRequired = false, sessionId = ID:ESBHOST-50593-635366336896487074-1:56:1}
2014-05-25 21:11:48,806 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding Producer: ProducerInfo {commandId = 4, responseRequired = false, producerId = ID:ESBHOST-50593-635366336896487074-1:56:1:2, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}
2014-05-25 21:11:48,816 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Sending message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:ESBHOST-50593-635366336896487074-1:56:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:ESBHOST-50593-635366336896487074-1:56:1:2, destination = queue://test.dotnet.testharness, transactionId = null, expiration = 0, timestamp = 1401066708752, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = e3fbd106-0acd-45e7-9045-5710484cf29e, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@593233cd, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello World I am an ActiveMQ Message...}
2014-05-25 21:11:48,816 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding destination: Queue:test.dotnet.testharness
2014-05-25 21:11:48,882 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Producer: ProducerInfo {commandId = 4, responseRequired = false, producerId = ID:ESBHOST-50593-635366336896487074-1:56:1:2, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 1}
2014-05-25 21:11:48,883 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Session: SessionInfo {commandId = 3, responseRequired = false, sessionId = ID:ESBHOST-50593-635366336896487074-1:56:1}
2014-05-25 21:11:48,884 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Consumer: ConsumerInfo {commandId = 2, responseRequired = true, consumerId = ID:ESBHOST-50593-635366336896487074-1:56:-1:1, destination = ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = false, selector = null, clientId = null, subscriptionName = null, noLocal = true, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}
2014-05-25 21:11:48,885 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Session: SessionInfo {commandId = 0, responseRequired = false, sessionId = ID:ESBHOST-50593-635366336896487074-1:56:-1}
2014-05-25 21:11:48,885 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Connection: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:ESBHOST-50593-635366336896487074-1:56, clientId = ID:ESBHOST-50593-635366336896487074-55:0, clientIp = tcp://10.224.14.167:49697, userName = testuser, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = false, faultTolerant = false, failoverReconnect = false}

Note that the format is pipe delimited.

This comes from the log4j layout ConversionPattern setup in $FUSE_HOME/etc/org.ops4j.pax.logging.cfg.

Here is mine:

log4j.appender.out.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{bundle.id} - %X{bundle.name} - %X{bundle.version} | %m%n

Note that it creates 6 fields, delimited by pipes.

To parse this file we’ll need to configure Logstash and tell it how to interpret it.

Logstash is configured with a config file. It can be started on the command line from the logstash directory with the config file like this:

bin/logstash agent -f logstash.config

I have set my logstash.config up to parse this format with a grok section in the filter definition. Grok is a high-level expression syntax for matching and tokenizing text. It extends from regular expressions, so it inherits the power of regex with the convenience of having higher-level semantic constructs like %{LOGLEVEL:****} and %{GREEDYDATA:****}, etc.

Here is my logstash.config:

input {
  file {
    type => "esb"
    path => ["/opt/jboss-fuse/data/log/fuse.log"]
    sincedb_path => "/opt/elk/logstash/sync_db/jboss-fuse"
  }
}

filter {
  if [type] == "esb" {
    grok {      
      match => { 
        message => "%{TIMESTAMP_ISO8601:logdate}%{SPACE}\|%{SPACE}%{LOGLEVEL:level}%{SPACE}\|%{SPACE}%{DATA:thread}%{SPACE}\|%{SPACE}%{DATA:category}%{SPACE}\|%{SPACE}%{DATA:bundle}%{SPACE}\|%{SPACE}%{GREEDYDATA:messagetext}"
      }
      add_tag => ["env_dev"] 
    }
    if "_grokparsefailure" in [tags] {
      multiline {
        pattern => ".*"
        what => "previous"
        add_tag => "notgrok"
      }
    }  
    date {
       match => ["logdate", "yyyy-MM-dd HH:mm:ss,SSS"]
    }
  }
}

output {
  elasticsearch {
    host => "elasticsearch.mydomain.com"
  }
  stdout {
    codec => rubydebug
  }
}

This configuration will cause logstash to read in the file. Logstash tails the file while running and tokenizes each new log entry into the fields specified in the match => string, as shown. The fields then are:

  • logdate
  • level
  • thread
  • category
  • bundle
  • messagetext

Logstash creates a data structure for each log entry, with the populated fields specified in the grok filter’s match pattern.

{
        "message" => "2014-05-25 22:08:04,950 | INFO  | 4.167:8174@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Sending message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2, destination = queue://test.dotnet.testharness, transactionId = null, expiration = 0, timestamp = 1401070084863, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = 0defa0b0-75de-4d51-a127-9aa1e55fa9fc, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@751107eb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello World I am an ActiveMQ Message...}",
       "@version" => "1",
     "@timestamp" => "2014-05-26T02:08:04.950Z",
           "type" => "esb",
           "host" => "esbhost.domain.com",
           "path" => "/opt/jboss-fuse/data/log/fuse.log",
        "logdate" => "2014-05-25 22:08:04,950",
          "level" => "INFO",
         "thread" => "4.167:8174@61616",
       "category" => "LoggingBrokerPlugin",
         "bundle" => "132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379",
    "messagetext" => "Sending message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2, destination = queue://test.dotnet.testharness, transactionId = null, expiration = 0, timestamp = 1401070084863, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = 0defa0b0-75de-4d51-a127-9aa1e55fa9fc, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@751107eb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello World I am an ActiveMQ Message...}",
           "tags" => [
        [0] "env_dev"
    ]
}

These are automatically sent to Elasticsearch, which will be listening on a different host usually. You’ll set it up as the Aggregation hub for all your logstash agents. It exposes its api on tcp port 9292 and will accept input from the logstash agents via http requests to its port. Going into how to query Elasticsearch is beyond the scope of this post, but I will cover it in a subsequent article.

Kibana is a JavaScript and HTML app only, it interfaces with Elasticsearch and provides a powerful analytics interface over the data in ES.

Here is an example of mine watching sending message levels in JBoss Fuse’s ActiveMQ Broker.

Kibana 3 - Logstash Search - Mozilla Firefox_2014-05-25_19-37-45

Namaste…

Accepting Invalid SSL Certificates in .NET WCF Clients

There are times when SSL certificates are used to verify identity and to provide TLS and there are cases when only the wire encryption matters.  In the later case, I sometimes need to be able handle server certificates that are not valid by SSL’s standard rules.  This could be because the cert is not signed by a trusted certificate authority or is expired, etc.  When I encounter this problem and am for various reasons unable to deal with the root cause, there is a simple technique that allows you to plug in your own strategy to determine certificate validity.

Basically you do the following:

  • In a seam of bootstrapping code, you’ll want to add a ServerCertificateValidationCallback to the WCF ServicePointManager

Here’s a working example that accepts any SSL Certificate as valid:

ServicePointManager.ServerCertificateValidationCallback =
     (object sender, X509Certificate cert, X509Chain chain, SslPolicyErrors errors) 
          => true;

With this patched strategy in place, your WCF client will now accept any SSL certificate its given. Note that, in the lambda body, you can put in your own logic to interrogate the parameters for what you consider to be acceptable:

X509Certificate cert

X509Chain chain

SslPolicyErrors errors

The logic applied can be more or less rigorous than the default certificate validation strategy.  The beauty of this approach is in the power of its simple implementation.

Enjoy..

NTLM Authentication in Java with JCifs

In enterprise software development contexts, one of the frequent needs we encounter is working with FileSystems remotely via CIFS, sometimes referred to as SMB.  If you are using Java in these cases, you’ll want JCifs, a pure Java CIFS implementation.  In this post, I’ll show you how to remotely connect to a Windows share on an Active Directory domain and read/write a file.

In your pom.xml place this dependency:

<dependency>
    <groupId>jcifs</groupId>
    <artifactId>jcifs</artifactId>
    <version>1.3.17</version>
</dependency>

Here is a simple class with a main, you can run to see how it works:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;

import jcifs.UniAddress;
import jcifs.smb.NtlmPasswordAuthentication;
import jcifs.smb.SmbException;
import jcifs.smb.SmbFile;
import jcifs.smb.SmbFileInputStream;
import jcifs.smb.SmbFileOutputStream;
import jcifs.smb.SmbSession;

public class Program {

	public static void main(String[] args) throws SmbException, UnknownHostException, Exception {
	    
        final UniAddress domainController = UniAddress.getByName("DOMAINCONTROLLERHOSTNAME");
		 
	    final NtlmPasswordAuthentication credentials = new NtlmPasswordAuthentication("DOMAIN.LOCAL", "USERNAME", "SECRET");
	 
	    SmbSession.logon(domainController, credentials);
	    
	    SmbFile smbFile = new SmbFile("smb://localhost/share/foo.txt", credentials);
	    
	    //write to file
	    new SmbFileOutputStream(smbFile).write("testing....and writing to a file".getBytes());
	    
	    //read from file
	    String contents = readFileContents(smbFile);
	    
	    System.out.println(contents);

	}

	private static String readFileContents(SmbFile sFile) throws IOException {

		BufferedReader reader = reader = new BufferedReader(
				new InputStreamReader(new SmbFileInputStream(sFile)));

		StringBuilder builder = new StringBuilder();
		
		String lineReader = null;
		while ((lineReader = reader.readLine()) != null) {
			builder.append(lineReader).append("\n");
		}
		return builder.toString();
	}

}

As you can see its quite trivial to reach out across your network and interact with Files and Directories in Windows/Samba Shares. Being able to authenticate via NTLM is convenient and tidy for this purpose, not to mention the FileSystem API is straight forward and powerful.

Enjoy the power..

Meditation on JavaScript’s SetTimeout(), Recursion, and Execution Context

JavaScript’s setTimout and setInterval functions are for scheduling future work. They provide a way to schedule either a single or recurring execution of a function. In this article, we’ll play around with some fun and interesting capabilities that can be had using setTimeout.

To get going we’ll need a few examples in place.

Here is a a simple Calculator object that we can use for our testing. Note that it has 2 methods (add and multiply). It also contains a piece of state (value) that it uses to track its computational output.

//an object with methods
function Calculator (seed) {
    return {
          value: seed,
          add: function(b) {
            value = value || 0; //guard with default accumulator for addition
            this.value += b;
            console.log(this.value);
            return this;
          },
          multiply: function(b) {
            value = value || 1; //guard with default accumulator for multiplication
            this.value *= b;
            console.log(this.value);
            return this;
          }
    };
}
var calculator = new Calculator();

//basic math fluency
var twentyFive = calculator.add(1 + 1)
                           .add(3)
                           .multiply(5)
                           .value;

Now, let’s have some fun by giving all JavaScript objects in the runtime the ability to defer their own method execution until a time in the future. We do this by extending the Object prototype for the whole JavaScript runtime.

This implementation is as follows. Note that, that it imbues all objects with a new ability, to run future functions with themselves as the reciever.

if (typeof Object.prototype.later !== 'function') {
	Object.prototype.later = function (msec, method) {
		var that = this,
			args = Array.prototype.slice.apply(arguments, [2]);
		if (typeof method === 'string') {
			method = that[method];
		}
		setTimeout(function() {
			method.apply(that, args);
		}, msec);
		return that;
	};
}

Source: Douglas Crockford.

This in effect shims all objects with an additional behavior called “later”. It changes the receiver of the function call in the future to the object from which later() was invoked.

The function expects these things:

Parameter 1: The number of milliseconds to wait before executing

Parameter 2: Either a function or a string function name to execute

Additional Optional Parameters: The arguments for the method to be executed in the future

The later method takes in the parameters and schedules the function (parameter 2) with its arguments (parameters 3,4,5,…) to be executed in the specified milliseconds (parameter 1). Note that we use a closure to refer to the receiver (“this”) as “that” during the setup and scheduling phase of its execution. This ensures that the “this” inside of scope of execution is the object from which the .later() method is called. This overrides the pointer in setTimeout() for “this” from the default “global” JavaScript object to the object from which later() is invoked. This simple but elegant replacing of the execution context object is perhaps one of the most powerful features of JavaScript. Now that we’ve got this extension to the Object prototype in place, all objects in our system, including Calculator will have this behavior.

So let’s have some fun with the calculator by exercising the Chaining API it inherited from Object. Here are some tricks taking advantage of the fluent later() method’s ability to drive a series of asynchronous calls over the object from which the expression chain originates. Since calculator’s prototype is Object, it can drive its own api. This example demonstrates several different ways to invoke the later() method in a single expression chain. Let’s turn a calculator into an adding machine.

calculator
	//can pass a method name as a string.  targeting calculator.add
	.later(100, &amp;quot;add&amp;quot;, 1)

	//can pass a method reference directly from the object
	.later(1000, calculator.add, 1)

	//can pass an anonymous function.
	.later(100, function(argument) {
		this.value += argument;
	}, 1)

    //close back onto chain originator
    .later(999, function(argument) { calculator.value += argument; }, 1);

//start an async message loop
calculator.later(0, function loop(argument) {
    this.value += argument;
    console.log(this.value);
    this.later(0, loop, argument);
}, &amp;quot;Looping&amp;quot;);

Now lets have some fun with arrays.  Since JavaScript arrays inherit their prototype from Object, and we extended Object’s prototype with the later() method, arrays now also have the later() method.

Process an Array Asychronously:

var sequence = [1,2,3,4,5];
sequence.forEach(function(n) {
    sequence.later(0, function(i) {
    	console.log(&amp;quot;got: &amp;quot; + i);
    }, n);
});

This is cool..

Now lets replace the execution context object with a new Calculator.

Driving Execution Over an Array – Compute Factorial:

[1,2,3,4,5].forEach(function(n) {
    return this.multiply(n).value;
}, new Calculator());

Lets Add Laterness:

//scheduled execution
[1,2,3,4,5].forEach(function(n) {
    this.later(100, this.multiply, n);
}, new Calculator());

Decaying Scheduled Array Processing:

[1,2,3,4,5].forEach(function(n) {
    this.wait = (this.wait || 0) + 1000 ;
    this.later(this.wait, this.multiply, n);
}, new Calculator());

Randomly Scheduled Array Processing:

[1,2,3,4,5].forEach(function(n) {
    this.later(Math.floor((Math.random()*1000)+1), this.multiply, n);
}, new Calculator());

Namaste…