Streaming Tranalyzer flows into Apache Kafka
Contents
Prerequisites
For this tutorial, it is assumed the user has a basic knowledge of Tranalyzer and that the file t2_aliases
has been sourced in ~/.bashrc
or ~/.bash_aliases
as follows (Refer to How to install Tranalyzer for more details):
# $HOME/.bashrc
if [ -f "$T2HOME/scripts/t2_aliases" ]; then
. "$T2HOME/scripts/t2_aliases" # Note the leading `.'
fi
Make sure to replace $T2HOME
with the actual path, e.g., $HOME/tranalyzer2-0.9.3
):
Dependencies
The kafkaSink plugin uses the librdkafka
library, which may be installed as follows:
Ubuntu | sudo apt-get install librdkafka-dev |
Arch | sudo pacman -S librdkafka |
Gentoo | sudo emerge librdkafka |
openSUSE | sudo zypper install librdkafka-devel |
Red Hat/Fedora | sudo dnf install librdkafka-devel or sudo yum install librdkafka-devel |
macOS | brew install librdkafka |
Required plugins
The only required plugin is the kafkaSink plugin. For this tutorial, we will also load the basicFlow, basicStats and tcpStates plugins. Although not required, those plugins provide useful information, such as source and destination addresses and ports, protocols and basic statistics about packets and bytes. They can be built by running:
t2build basicFlow basicStats kafkaSink tcpStates
Services initialization
The kafkaSink plugin requires a ZooKeeper and a Kafka broker service running on KAFKA_BROKERS
, e.g., 127.0.0.1:9092
:
t2conf kafkaSink -G KAFKA_BROKERS
KAFKA_BROKERS = "127.0.0.1:9092"
Start the ZooKeeper server and send it to the background:
zookeeper-server-start.sh /etc/kafka/zookeeper.properties &
Start the Kafka server and send it to the background:
kafka-server-start.sh /etc/kafka/server.properties &
Plugin and core configuration
Let’s first look at the default configuration of the kafkaSink plugin:
kafkaSink
vi src/kafkaSink.h
...
/* ========================================================================== */
/* ------------------------ USER CONFIGURATION FLAGS ------------------------ */
/* ========================================================================== */
#define KAFKA_DEBUG 0 // Print debug messages
#define KAFKA_RETRIES 3 // Max. number of retries when message production failed [0 - 255]
/* +++++++++++++++++++++ ENV / RUNTIME - conf Variables +++++++++++++++++++++ */
#define KAFKA_BROKERS "127.0.0.1:9092" // Broker address(es)
// (comma separated list of host[:port])
#define KAFKA_TOPIC "tranalyzer.flows" // Topic to produce to
#define KAFKA_PARTITION -1 // Target partition:
// - >= 0: fixed partition
// - -1: automatic partitioning (unassigned)
/* ========================================================================== */
/* ------------------------- DO NOT EDIT BELOW HERE ------------------------- */
/* ========================================================================== */
...
For this tutorial, we will use the default values.
The kafkaSink plugin only send the flows to Kafka. What if you also want to keep track of the errors, warnings and other information produced by Tranalyzer? We’ll look into that in this tutorial, but first, let us switch off the coloring of T2 report and make sure to rebuild everything:
t2conf tranalyzer2 -D T2_LOG_COLOR=0
t2build -R
Time to stream
In this tutorial, we will work with a PCAP file, but you could also process the traffic directly from an interface.
Start by downloading the PCAP file file we will be using. Make sure your ZooKeeper and Kafka servers are up and running (See the Services Initialization section)!
Now run t2
:
t2 -r faf-exercise.pcap
It is as simple as that!
Let’s make sure Kafka has received our data!
# Consume messages for tranalyzer.flows topic
$ kafka-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic tranalyzer.flows
Sending stdout and stderr
Sometimes it is interesting to store the logs produced by T2.
The errors ([ERR]), warnings ([WRN]) and information ([INF]) are particularly useful.
Errors are sent to stderr
, while warnings and information are sent to stdout
.
We will mirror that in Kafka by sending them to two different topics, namely tranalyzer.err and tranalyzer.out.
For simplicity, we will use the kcat
(formerly kafkacat
) tool.
Let’s look at its syntax (or at least at the parts we will need):
kcat
...
General options:
-C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode
-G <group-id> Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
...
For our tutorial, we will need the -P
, -b
and -t
options.
The -P
option indicates we want to produce data, the -b
identifies the address of the Kafka broker and the -t
is used to select a topic.
Now that this is out of the way, let us look at the full command to see how we can redirect stderr
and stdout
separately:
$ t2 -r faf-exercise.pcap \
1> >(grep -F -e "[INF]" -e "[WRN]" | kcat -P -b 127.0.0.1:9092 -t tranalyzer.out) \
2> >(kcat -P -b 127.0.0.1:9092 -t tranalyzer.err)
We can access the tranalyzer.flows topic as before:
# Consume messages for tranalyzer.flows topic
$ kafka-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic tranalyzer.flows
If we want to consume messages from another topic, we just have to specify a different -t
option.
Consume messages for tranalyzer.err topic:
$ kafka-console-consumer \ --bootstrap-server localhost:9092 \ --from-beginning \ --topic tranalyzer.flows
Consume messages for tranalyzer.out topic:
$ kafka-console-consumer \ --bootstrap-server localhost:9092 \ --from-beginning \ --topic tranalyzer.flows
Conclusion
Don’t forget to reset the plugin configuration for the next tutorial.
t2conf tranalyzer2 -D T2_LOG_COLOR=0
Have fun analyzing!