Grokking JBoss Fuse Logs with Logstash

JBoss Fuse or more generally Apache ServiceMix ship with a default log4j layout ConversionPattern. In this article I will show you how to parse your $FUSE_HOME/data/log/fuse.log file, collect its log entries into Elasticsearch, and understand whats going on in the Kibana UI.

First a few pieces of context.

If you are not familiar with the ELK Stack, please read up on it here.

In this scenario, we are going to use Logstash as our log parser and collector agent. Elasticsearch (ES) will provide the storage and RESTful search interface. And, Kibana will be our UI over the data in ES.

First we need to get the data. In our case the target we want to analyze in the JBoss Fuse log file. Since, my $FUSE_HOME is /opt/jboss-fuse, my log file will be at /opt/jboss-fuse/data/log/fuse.log.

2014-05-25 21:11:48,677 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding destination: Topic:ActiveMQ.Advisory.Connection
2014-05-25 21:11:48,743 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding Consumer: ConsumerInfo {commandId = 2, responseRequired = true, consumerId = ID:ESBHOST-50593-635366336896487074-1:56:-1:1, destination = ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = false, selector = null, clientId = null, subscriptionName = null, noLocal = true, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}
2014-05-25 21:11:48,804 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding Session: SessionInfo {commandId = 3, responseRequired = false, sessionId = ID:ESBHOST-50593-635366336896487074-1:56:1}
2014-05-25 21:11:48,806 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding Producer: ProducerInfo {commandId = 4, responseRequired = false, producerId = ID:ESBHOST-50593-635366336896487074-1:56:1:2, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}
2014-05-25 21:11:48,816 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Sending message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:ESBHOST-50593-635366336896487074-1:56:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:ESBHOST-50593-635366336896487074-1:56:1:2, destination = queue://test.dotnet.testharness, transactionId = null, expiration = 0, timestamp = 1401066708752, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = e3fbd106-0acd-45e7-9045-5710484cf29e, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@593233cd, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello World I am an ActiveMQ Message...}
2014-05-25 21:11:48,816 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Adding destination: Queue:test.dotnet.testharness
2014-05-25 21:11:48,882 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Producer: ProducerInfo {commandId = 4, responseRequired = false, producerId = ID:ESBHOST-50593-635366336896487074-1:56:1:2, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 1}
2014-05-25 21:11:48,883 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Session: SessionInfo {commandId = 3, responseRequired = false, sessionId = ID:ESBHOST-50593-635366336896487074-1:56:1}
2014-05-25 21:11:48,884 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Consumer: ConsumerInfo {commandId = 2, responseRequired = true, consumerId = ID:ESBHOST-50593-635366336896487074-1:56:-1:1, destination = ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = false, selector = null, clientId = null, subscriptionName = null, noLocal = true, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null}
2014-05-25 21:11:48,885 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Session: SessionInfo {commandId = 0, responseRequired = false, sessionId = ID:ESBHOST-50593-635366336896487074-1:56:-1}
2014-05-25 21:11:48,885 | INFO  | .167:49697@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Removing Connection: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:ESBHOST-50593-635366336896487074-1:56, clientId = ID:ESBHOST-50593-635366336896487074-55:0, clientIp = tcp://10.224.14.167:49697, userName = testuser, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = false, faultTolerant = false, failoverReconnect = false}

Note that the format is pipe delimited.

This comes from the log4j layout ConversionPattern setup in $FUSE_HOME/etc/org.ops4j.pax.logging.cfg.

Here is mine:

log4j.appender.out.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{bundle.id} - %X{bundle.name} - %X{bundle.version} | %m%n

Note that it creates 6 fields, delimited by pipes.

To parse this file we’ll need to configure Logstash and tell it how to interpret it.

Logstash is configured with a config file. It can be started on the command line from the logstash directory with the config file like this:

bin/logstash agent -f logstash.config

I have set my logstash.config up to parse this format with a grok section in the filter definition. Grok is a high-level expression syntax for matching and tokenizing text. It extends from regular expressions, so it inherits the power of regex with the convenience of having higher-level semantic constructs like %{LOGLEVEL:****} and %{GREEDYDATA:****}, etc.

Here is my logstash.config:

input {
  file {
    type => "esb"
    path => ["/opt/jboss-fuse/data/log/fuse.log"]
    sincedb_path => "/opt/elk/logstash/sync_db/jboss-fuse"
  }
}

filter {
  if [type] == "esb" {
    grok {      
      match => { 
        message => "%{TIMESTAMP_ISO8601:logdate}%{SPACE}\|%{SPACE}%{LOGLEVEL:level}%{SPACE}\|%{SPACE}%{DATA:thread}%{SPACE}\|%{SPACE}%{DATA:category}%{SPACE}\|%{SPACE}%{DATA:bundle}%{SPACE}\|%{SPACE}%{GREEDYDATA:messagetext}"
      }
      add_tag => ["env_dev"] 
    }
    if "_grokparsefailure" in [tags] {
      multiline {
        pattern => ".*"
        what => "previous"
        add_tag => "notgrok"
      }
    }  
    date {
       match => ["logdate", "yyyy-MM-dd HH:mm:ss,SSS"]
    }
  }
}

output {
  elasticsearch {
    host => "elasticsearch.mydomain.com"
  }
  stdout {
    codec => rubydebug
  }
}

This configuration will cause logstash to read in the file. Logstash tails the file while running and tokenizes each new log entry into the fields specified in the match => string, as shown. The fields then are:

  • logdate
  • level
  • thread
  • category
  • bundle
  • messagetext

Logstash creates a data structure for each log entry, with the populated fields specified in the grok filter’s match pattern.

{
        "message" => "2014-05-25 22:08:04,950 | INFO  | 4.167:8174@61616 | LoggingBrokerPlugin              | 132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379 | Sending message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2, destination = queue://test.dotnet.testharness, transactionId = null, expiration = 0, timestamp = 1401070084863, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = 0defa0b0-75de-4d51-a127-9aa1e55fa9fc, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@751107eb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello World I am an ActiveMQ Message...}",
       "@version" => "1",
     "@timestamp" => "2014-05-26T02:08:04.950Z",
           "type" => "esb",
           "host" => "esbhost.domain.com",
           "path" => "/opt/jboss-fuse/data/log/fuse.log",
        "logdate" => "2014-05-25 22:08:04,950",
          "level" => "INFO",
         "thread" => "4.167:8174@61616",
       "category" => "LoggingBrokerPlugin",
         "bundle" => "132 - org.apache.activemq.activemq-osgi - 5.9.0.redhat-610379",
    "messagetext" => "Sending message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2:1, originalDestination = null, originalTransactionId = null, producerId = ID:MYCONSUMERHOST-50593-635366336896487074-1:57:1:2, destination = queue://test.dotnet.testharness, transactionId = null, expiration = 0, timestamp = 1401070084863, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = 0defa0b0-75de-4d51-a127-9aa1e55fa9fc, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@751107eb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello World I am an ActiveMQ Message...}",
           "tags" => [
        [0] "env_dev"
    ]
}

These are automatically sent to Elasticsearch, which will be listening on a different host usually. You’ll set it up as the Aggregation hub for all your logstash agents. It exposes its api on tcp port 9292 and will accept input from the logstash agents via http requests to its port. Going into how to query Elasticsearch is beyond the scope of this post, but I will cover it in a subsequent article.

Kibana is a JavaScript and HTML app only, it interfaces with Elasticsearch and provides a powerful analytics interface over the data in ES.

Here is an example of mine watching sending message levels in JBoss Fuse’s ActiveMQ Broker.

data_transfers_dashboard

Namaste…

Accepting Invalid SSL Certificates in .NET WCF Clients

There are times when SSL certificates are used to verify identity and to provide TLS and there are cases when only the wire encryption matters.  In the later case, I sometimes need to be able handle server certificates that are not valid by SSL’s standard rules.  This could be because the cert is not signed by a trusted certificate authority or is expired, etc.  When I encounter this problem and am for various reasons unable to deal with the root cause, there is a simple technique that allows you to plug in your own strategy to determine certificate validity.

Basically you do the following:

  • In a seam of bootstrapping code, you’ll want to add a ServerCertificateValidationCallback to the WCF ServicePointManager

Here’s a working example that accepts any SSL Certificate as valid:

ServicePointManager.ServerCertificateValidationCallback =
     (object sender, X509Certificate cert, X509Chain chain, SslPolicyErrors errors) 
          => true;

With this patched strategy in place, your WCF client will now accept any SSL certificate its given. Note that, in the lambda body, you can put in your own logic to interrogate the parameters for what you consider to be acceptable:

X509Certificate cert

X509Chain chain

SslPolicyErrors errors

The logic applied can be more or less rigorous than the default certificate validation strategy.  The beauty of this approach is in the power of its simple implementation.

Enjoy..