Module t2py.T2

Expand source code
#!/usr/bin/env python3

import json
import threading
import time

from os.path import expanduser, isdir, isfile, join
from typing import Any, Dict, Iterator, List, Union

from .T2Plugin import T2Plugin
from .T2Utils import T2Utils


class T2:

    """Manage several `t2py.T2Plugin` objects and run T2, convert/display flow file, ...

    Parameters
    ----------
    pcap: str, default: None
        Path to a pcap file.
    iface: str, default: None
        Name of a network interface.
    pcap_list : str or list of str, default: None

        - Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`.
        - Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`.

    output_prefix: str, default: None
        If `None`, automatically derived from input.
    packet_mode: bool, default: False
        Activate Tranalyzer2 packet mode.
    plugin_folder: str, default: None
        Path to the plugin folder.
        If `None`, default to `$HOME/.tranalyzer/plugins`.
    loading_list: str, default: None
        Path to a plugin loading list.
    plugins: list of str, default: None,
        A list of plugin names to load.
    output_format: str or list of str, default: None
        Select and configure the sink plugin(s) required to generate the requested output format.
        See also `T2.add_output_format()`.
    bpf: str, default: None
        A BPF filter.
    streaming: bool, default: False
        If `True`, then `T2.stream()` can be used to yield one flow at a time.

    Raises
    ------
    OSError
        If specified interface `iface` does not exist locally.
    RuntimeError
        If more than one input (`pcap`, `pcap_list`, `iface`) is specified.
    """

    plugin_folder: str = None
    """Path to the plugin folder. If `None`, default to `'$HOME/.tranalyzer/plugins'`."""

    def __init__(
            self,
            pcap: str = None,
            iface: str = None,
            pcap_list: Union[str, List[str]] = None,
            output_prefix: str = None,
            packet_mode: bool = False,
            plugin_folder: str = None,
            loading_list: str = None,
            plugins: List[str] = None,
            output_format: Union[str, List[str]] = None,
            bpf: str = None,
            streaming: bool = False
    ):

        if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list):
            raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified')
        elif iface and iface not in T2Utils.network_interfaces():
            raise OSError(f'Interface {iface} does not exist locally')

        # Private variables
        self._whitelisted_plugins = []
        self._default_plugin_folder = None
        self._tranalyzer2 = None
        self._plugins = dict()
        self._conn = None
        self._streaming = False

        # Input
        self.pcap = pcap
        self.iface = iface
        self.pcap_list = pcap_list

        # Output
        self.output_prefix = output_prefix
        self.packet_mode = packet_mode

        # Options
        self.bpf = bpf
        self.plugin_folder = plugin_folder
        self.loading_list = loading_list

        self.streaming = streaming

        if plugins:
            self.add_plugins(plugins)

        if output_format:
            self.add_output_format(output_format)

    def add_output_format(
            self,
            extension: Union[str, List[str]]
    ):
        """Select and configure the sink plugin(s) required to generate the requested output format.

        Parameters
        ----------
        extension : str or list of str
            `extension` may be any of:

            - `'bin'`       -> binSink
            - `'bin.gz'`    -> binSink + `GZ_COMPRESS = 1`
            - `'csv'`       -> txtSink + `SEP_CHR = ','`
            - `'csv.gz'`    -> txtSink + `SEP_CHR = ','` + `GZ_COMPRESS = 1`
            - `'json'`      -> jsonSink
            - `'json.gz'`   -> jsonSink + `GZ_COMPRESS = 1`
            - `'txt'`       -> txtSink
            - `'txt.gz'`    -> txtSink + `GZ_COMPRESS = 1`
            - `'netflow'`   -> netflowSink
            - `'mysql'`     -> mysqlSink
            - `'sqlite'`    -> sqliteSink
            - `'postgres'`  -> psqlSink
            - `'mongo'`     -> mongoSink
            - `'socket'`    -> socketSink
            - `'streaming'` -> socketSink + `CONTENT_TYPE = 2`
            - `'pcap'`      -> pcapd

        Raises
        ------
        RuntimeError
            If one of the specified extension is not supported.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.add_output_format(['json.gz', 'txt'])
        """
        if type(extension) == list:
            extensions = extension
        else:
            extensions = [extension]
        for ext in extensions:
            if ext == 'bin':
                self.add_plugin('binSink')
            elif ext == 'bin.gz':
                self.add_plugin('binSink')
                self.binSink.GZ_COMPRESS = 1
            elif ext == 'csv':
                self.add_plugin('txtSink')
                self.txtSink.SEP_CHR = ','
            elif ext == 'csv.gz':
                self.add_plugin('txtSink')
                self.txtSink.SEP_CHR = ','
                self.txtSink.GZ_COMPRESS = 1
            elif ext == 'txt':
                self.add_plugin('txtSink')
            elif ext == 'txt.gz':
                self.add_plugin('txtSink')
                self.txtSink.GZ_COMPRESS = 1
            elif ext == 'json':
                self.add_plugin('jsonSink')
            elif ext == 'json.gz':
                self.add_plugin('jsonSink')
                self.jsonSink.GZ_COMPRESS = 1
            elif ext == 'netflow':
                self.add_plugin('netflowSink')
            elif ext == 'mysql':
                self.add_plugin('mysqlSink')
            elif ext == 'sqlite':
                self.add_plugin('sqliteSink')
            elif ext == 'postgres':
                self.add_plugin('psqlSink')
            elif ext == 'mongo':
                self.add_plugin('mongoSink')
            elif ext == 'socket':
                self.add_plugin('socketSink')
            elif ext == 'pcap':
                self.add_plugin('pcapd')
            elif ext == 'streaming':
                self.streaming = True
            else:
                raise RuntimeError(f"output format '{ext}' not supported")

    @property
    def default_plugin_folder(self) -> str:
        """Path to the default plugin folder, i.e., `$HOME/.tranalyzer/plugins`."""
        if not self._default_plugin_folder:
            self._default_plugin_folder = join(expanduser('~'), '.tranalyzer', 'plugins')
        return self._default_plugin_folder

    @property
    def tranalyzer2(self) -> T2Plugin:
        """`t2py.T2Plugin` object representing Tranalyzer2 (core)."""
        if not self._tranalyzer2:
            self._tranalyzer2 = T2Plugin('tranalyzer2')
        return self._tranalyzer2

    @property
    def loading_list(self):
        """Get or set the path to a plugin loading list.

        In addition, the setter calls the `T2.add_plugin()` function for each plugin listed in the file.
        """
        return self._loading_list

    @loading_list.setter
    def loading_list(self, loading_list: str):
        self._whitelisted_plugins.clear()
        self._loading_list = loading_list
        if loading_list:
            with open(loading_list) as f:
                for plugin in f:
                    plugin = plugin.strip()
                    if len(plugin) > 0 and not plugin.startswith('#'):
                        self.add_plugin(plugin)
                        self._whitelisted_plugins.append(plugin)

    @property
    def plugins(self) -> Dict[str, T2Plugin]:
        """Dictionary of `t2py.T2Plugin` objects whose key is the plugin name."""
        return self._plugins

    @property
    def t2_exec(self) -> str:
        """Path to the Tranalyzer2 executable."""
        if self.plugin_folder:
            return join(self.plugin_folder, 'bin', 'tranalyzer')
        else:
            return join(self.default_plugin_folder, 'bin', 'tranalyzer')

    @property
    def streaming(self):
        """Allow `tranalyzer2` to run in streaming mode (call `T2.stream()` to access flows one by one)."""
        return self._streaming

    @streaming.setter
    def streaming(self, activate: bool):
        if activate:
            self.add_plugin('socketSink')
            self.socketSink.CONTENT_TYPE = 2
        self._streaming = activate

    def headers_file(self) -> str:
        """Return the filename of the headers file.

        Returns
        -------
        str
            The filename of the headers file.

        See Also
        --------
        T2.headers : Return Tranalyzer2 `_headers.txt` file contents.
        T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.run()
        >>> t2.headers_file()
        '/tmp/file_headers.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_headers.txt' if output_prefix else None

    def flow_file_json(self) -> str:
        """Return the filename of the JSON flow file.

        Returns
        -------
        str
            The filename of the JSON flow file.

        See Also
        --------
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='json')
        >>> t2.run()
        >>> t2.flow_file_json()
        '/tmp/file_flows.json'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_flows.json' if output_prefix else None

    def flow_file_txt(self) -> str:
        """Return the filename of the TXT flow file.

        Returns
        -------
        str
            The filename of the TXT flow file.

        See Also
        --------
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv')
        >>> t2.run()
        >>> t2.flow_file_txt()
        '/tmp/file_flows.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_flows.txt' if output_prefix else None

    def flow_file(self) -> str:
        """Return the filename of the JSON (preferred) or TXT flow file.

        Returns
        -------
        str
            The filename of the flow file.

        See Also
        --------
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt'])
        >>> t2.run()
        >>> t2.flow_file()
        '/tmp/file_flows.json'
        """
        if 'jsonSink' in self._plugins:
            return self.flow_file_json()
        elif 'txtSink' in self._plugins:
            return self.flow_file_txt()
        else:
            return None

    def packet_file(self) -> str:
        """Return the filename of the packet file.

        Returns
        -------
        str
            The filename of the packet file.

        See Also
        --------
        T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.
        T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt`.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap')
        >>> t2.run(packet_mode=True)
        >>> t2.packet_file()
        '/tmp/file_packets.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_packets.txt' if output_prefix else None

    def log_file(self) -> str:
        """Return the filename of the log file.

        Returns
        -------
        str
            The filename of the log file.

        See Also
        --------
        T2.log : Return Tranalyzer2 `_log.txt` file contents.
        T2.print_log : Print the contents of Tranalyzer2 `_log.txt` file.
        T2.report : Alias for `T2.log()`.
        T2.print_report : Alias for `T2.print_log()`.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
        >>> t2.run()
        >>> t2.log_file()
        '/tmp/file_log.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_log.txt' if output_prefix else None

    # ====================================================================== #

    def status(self):
        """Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers).

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz')
        >>> t2.status()
        """
        if self._tranalyzer2:
            t2 = self.tranalyzer2
            pending_changes = len(t2.changes)
            if pending_changes > 0:
                print(f'Tranalyzer2 [{pending_changes} change{"s" if pending_changes > 1 else ""} pending]:')
                for name, value in t2.changes.items():
                    print(f'    {name} = {value}')
                print()
        print('Plugins: ')
        num_plugins = 0
        for plugin_name in sorted(self._plugins, key=lambda name: self._plugins[name].number):
            plugin = self._plugins[plugin_name]
            pending_changes = len(plugin.changes)
            print(f'    {num_plugins+1}: {plugin_name}', end='')
            if pending_changes == 0:
                if len(self._whitelisted_plugins) > 0 and plugin_name not in self._whitelisted_plugins:
                    print(' [NOT IN LOADING LIST]')
                else:
                    print()
            else:
                print(f' [{pending_changes} change{"s" if pending_changes > 1 else ""} pending]', end='')
                if len(self._whitelisted_plugins) > 0 and plugin_name not in self._whitelisted_plugins:
                    print(' [NOT IN LOADING LIST]')
                else:
                    print()
                for name, value in plugin.changes.items():
                    print(f'            {name} = {value}')
            num_plugins += 1
        if num_plugins == 0:
            print('    None')
        print('\nTranalyzer options:')
        if self.pcap:
            print(f'    -r {self.pcap}')
        elif self.iface:
            print(f'    -i {self.iface}')
        elif self.pcap_list:
            if type(self.pcap_list) == list:
                print('    -R /tmp/pcap_list.txt')
            else:
                print(f'    -R {self.pcap_list}')
        if self.output_prefix:
            print(f'    -w {self.output_prefix}')
        if self.loading_list:
            print(f'    -b {self.loading_list}')
        if self.plugin_folder:
            print(f'    -p {self.plugin_folder}')
        if self.packet_mode:
            print('    -s')
        print('    -l')
        if self.bpf:
            print(f'    BPF filter: {self.bpf}')

    # ====================================================================== #

    def clear_plugins(self):
        """Clear the list of plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats'])
        >>> t2.clear_plugins()
        >>> t2.list_plugins()
        []
        """
        self._remove_dict_entries(list(self._plugins.keys()))
        self._plugins.clear()

    def set_plugins(
            self,
            plugins: List[str] = None
    ):
        """Overwrite the current list of plugins.

        Parameters
        ----------
        plugins : list of str, default: None
            The list of plugins to use.
            If `None`, empty the list of plugins.

        See Also
        --------
        T2.list_plugins : Alphabetically list the currently active plugins.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.set_plugins(['arpDecode', 'basicStats'])
        >>> t2.list_plugins()
        ['arpDecode', 'basicStats']
        >>> type(t2.arpDecode)
        <class 't2py.T2Plugin.T2Plugin'>
        >>> t2.basicStats.description
        'Basic statistics'
        """
        self.clear_plugins()
        if plugins:
            self._plugins = {plugin: T2Plugin(plugin) for plugin in plugins}
            self._add_dict_entries(plugins)

    def add_plugin(
            self,
            plugin: str
    ):
        """Add a plugin to the list of plugins.

        Parameters
        ----------
        plugin : str
            The name of the plugin to add.

        See Also
        --------
        T2.add_plugins : Add a list of plugins to the list of plugins.
        T2.remove_plugin : Remove a plugin from the list of plugins.
        T2.remove_plugins : Remove a list of plugins from the list of plugins.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.add_plugin('arpDecode')
        >>> type(t2.arpDecode)
        <class 't2py.T2Plugin.T2Plugin'>
        >>> t2.arpDecode.number
        '179'
        """
        if plugin not in self._plugins:
            self._plugins[plugin] = T2Plugin(plugin)
            self._add_dict_entry(plugin)

    def add_plugins(
            self,
            plugins: List[str]
    ):
        """Add a list of plugins to the list of plugins.

        Parameters
        ----------
        plugins : list of str
            The list of plugins to add.

        See Also
        --------
        T2.add_plugin : Add a plugin to the list of plugins.
        T2.remove_plugin : Remove a plugin from the list of plugins.
        T2.remove_plugins : Remove a list of plugins from the list of plugins.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.add_plugins(['arpDecode', 'basicStats'])
        >>> type(t2.arpDecode)
        <class 't2py.T2Plugin.T2Plugin'>
        >>> t2.basicStats.description
        'Basic statistics'
        """
        for plugin in plugins:
            self.add_plugin(plugin)

    def remove_plugin(
            self,
            plugin: str
    ):
        """Remove a plugin from the list of plugins.

        Parameters
        ----------
        plugin : str
            The name of the plugin to remove.

        See Also
        --------
        T2.add_plugin : Add a plugin to the list of plugins.
        T2.add_plugins : Add a list of plugins to the list of plugins.
        T2.remove_plugins : Remove a list of plugins from the list of plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats'])
        >>> t2.remove_plugin('arpDecode')
        >>> t2.list_plugins()
        ['basicStats']
        """
        self._remove_dict_entry(plugin)
        self._plugins.pop(plugin)

    def remove_plugins(
            self,
            plugins: List[str]
    ):
        """Remove a list of plugins from the list of plugins.

        Parameters
        ----------
        plugins : list of str
            The list of plugins to remove.

        See Also
        --------
        T2.add_plugin : Add a plugin to the list of plugins.
        T2.add_plugins : Add a list of plugins to the list of plugins.
        T2.remove_plugin : Remove a plugin from the list of plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.remove_plugins(['arpDecode', 'basicStats'])
        >>> t2.list_plugins()
        ['tcpFlags']
        """
        for plugin in plugins:
            self.remove_plugin(plugin)

    # =========================================================================

    def list_plugins(self):
        """Alphabetically list the loaded plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.list_plugins()
        ['arpDecode', 'basicStats', 'tcpFlags']
        >>> t2.arpDecode.description
        'Address Resolution Protocol (ARP)'
        """
        return sorted(list(self._plugins.keys()))

    def create_plugin_list(
            self,
            plugins: List[str] = None,
            outfile: str = None,
            verbose: bool = False
    ):
        """Create a loading list of plugins.

        Parameters
        ----------
        plugins : list of str, default: None
            List of plugin names.
            If `None`, use `T2.list_plugins()`.
        outfile : str, default: None
            Name of the file to save.
            If `None`, default to `'plugins.load'` in the plugin folder.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.create_plugin_list()
        >>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt')
        >>> t2.create_plugin_list(['basicFlow', 'txtSink'])
        >>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt')
        """
        if not outfile:
            if self.plugin_folder:
                outfile = join(self.plugin_folder, 'plugins.load')
            else:
                outfile = join(self.default_plugin_folder, 'plugins.load')
        if not plugins:
            plugins = list(self._plugins.keys())
        T2Utils.create_plugin_list(plugins, outfile, verbose=verbose)

    # =========================================================================

    def build(
            self,
            plugin: Union[str, List[str]] = None,
            plugin_folder: str = None,
            force_rebuild: bool = False,
            debug: bool = False,
            verbose: bool = False
    ):
        """Build Tranalyzer2 and the plugins.

        Parameters
        ----------
        plugin : str or list of str, default: None
            List of plugin names to build or path to a plugin loading list.
            If `None`, use `T2.list_plugins()`.
        plugin_folder : str, default: None
            Path to the plugin folder.
            If `None`, use `T2.plugin_folder`.
        force_rebuild : bool, default: False
            Force the rebuild of the configuration files (`$ t2build -r ...`).
        debug : bool, default: False
            Build the plugin in debug mode.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        See Also
        --------
        T2.clean : Remove files generated by `T2.build()`.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.build()
        """
        plugin_folder = plugin_folder if plugin_folder else self.plugin_folder
        to_build = plugin if plugin else list(self._plugins.keys())
        if not plugin:
            to_build.append('tranalyzer2')
        T2Utils.build(to_build, plugin_folder, force_rebuild, debug, verbose=verbose)

    def clean(
            self,
            plugin: Union[str, List[str]] = None,
            verbose: bool = False
    ):
        """Clean Tranalyzer2 and the plugins.

        Parameters
        ----------
        plugin : str or list of str, default: None
            List of plugin names to clean, or path to a plugin loading list.
            If `None`, use `T2.list_plugins()`.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        See Also
        --------
        T2.build : Build Tranalyzer2 and the plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.build()
        >>> t2.clean()
        """
        to_clean = plugin if plugin else list(self._plugins.keys())
        if not plugin:
            to_clean.append('tranalyzer2')
        T2Utils.clean(to_clean, verbose=verbose)

    def unload(
            self,
            plugin: Union[str, List[str]] = None,
            plugin_folder: str = None,
            verbose: bool = False
    ):
        """Remove (unload) plugin(s) from the plugin folder.

        Parameters
        ----------
        plugin : str or list of str, default: None
            `plugin` can be:

            - a single plugin, e.g., `'pluginName'`
            - a list of plugins, e.g., `['pluginName1', 'pluginName2']`
            - a plugin loading list, e.g., `'myPlugins.txt'`
            - `all` to apply the operation to all available plugins
            - If `None`, use `T2.list_plugins()`.

        plugin_folder : str, default: None
            The plugin folder from where the plugin must be unloaded.
            If `None`, use `T2.plugin_folder`.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.build()
        >>> t2.unload()
        """
        plugin_folder = plugin_folder if plugin_folder else self.plugin_folder
        to_unload = plugin if plugin else list(self._plugins.keys())
        T2Utils.unload(to_unload, plugin_folder, verbose=verbose)

    def apply_changes(
            self,
            verbose: bool = False
    ):
        """Apply/persist the current configuration to the source files.

        Parameters
        ----------
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If one of the processes exited with a non-zero exit code.

        Examples
        --------
        >>> t2 = T2(output_format='csv.gz')
        >>> t2.apply_changes()
        """
        self.tranalyzer2.apply_changes(verbose=verbose)
        for plugin_name, plugin in self._plugins.items():
            plugin.apply_changes(verbose=verbose)

    def discard_changes(self):
        """Discard all non-committed changes made in this session.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'txtSink'])
        >>> t2.arpDecode.MAX_IP = 5
        >>> t2.discard_changes()
        """
        for plugin_name, plugin in self._plugins.items():
            plugin.discard_changes()

    def reset(self):
        """Reset all plugins' flags to their default value.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'txtSink'])
        >>> t2.arpDecode.MAX_IP = 5
        >>> t2.apply_config()
        >>> t2.reset()
        >>> t2.apply_config()
        """
        for plugin_name, plugin in self._plugins.items():
            plugin.reset()

    def _setup_streaming(self):
        assert 'socketSink' in self._plugins

        # start socket listener in separate thread
        def _socket_sink_listener(self):
            import socket
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind((self.socketSink.SERVADD, self.socketSink.DPORT))
            s.listen(1)
            self._conn, addr = s.accept()
            s.close()

        print(f'Setting up streaming server {self.socketSink.SERVADD}:{self.socketSink.DPORT}')
        thread = threading.Thread(target=_socket_sink_listener, args=(self,))
        thread.daemon = True  # stop thread when _socket_sink_listener returns
        thread.start()

    def run(
            self,
            pcap: str = None,
            iface: str = None,
            pcap_list: Union[str, List[str]] = None,
            output_prefix: str = None,
            packet_mode: bool = False,
            plugins: List[str] = None,
            plugin_folder: str = None,
            loading_list: str = None,
            bpf: str = None,
            rebuild: bool = False,
            streaming: bool = False,
            timeout: int = None,
            verbose: bool = False
    ):
        """Run Tranalyzer2.

        Parameters
        ----------
        pcap : str, default: None
            Path to a pcap file.
        iface : str, default: None
            Name of a network interface.
        pcap_list : str or list of str, default: None
            Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`.
            Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`.
        output_prefix : str, default: None
            If `None`, automatically derived from input.
        packet_mode : bool, default: False
            Activate Tranalyzer2 packet mode.
        plugin_folder : str, default: None
            Path to the plugin folder.
        plugins : list of str, default: None
            The list of plugins to use.
            If `None`, use `T2.list_plugins()`.
        loading_list : str, default: None
            Path to a plugin loading list.
            If `None`, create one using `T2.list_plugins()`.
        bpf : str, default: None
            A BPF filter.
        rebuild : bool, default, default: False
            Apply the current changes and rebuild Tranalyzer2 and the plugins.
        streaming: bool, default: False
            If `True`, then `T2.stream()` can be used to yield one flow at a time.
        timeout : int, default: None
            Number of seconds after which to terminate the process.
            If `None`, run forever or until the end of file is reached.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        OSError
            If specified interface `iface` does not exist locally.
        RuntimeError

            - If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified.
            - If socketSink plugin is not loaded or misconfigured when `T2.streaming` is `True`.

        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.
        subprocess.TimeoutExpired
            If `timeout` was expired and its value reached.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
        >>> t2.build()
        >>> t2.run()
        >>> t2.run(packet_mode=True)
        """
        if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list):
            raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified')

        if (
            not pcap and not self.pcap and
            not iface and not self.iface and
            not pcap_list and not self.pcap_list
        ):
            raise RuntimeError('An input (pcap, pcap_list or iface) is required')

        if pcap:
            self.pcap = pcap
        elif iface:
            self.iface = iface
        elif pcap_list:
            self.pcap_list = pcap_list

        if output_prefix:
            self.output_prefix = output_prefix

        if plugin_folder:
            self.plugin_folder = plugin_folder

        if loading_list:
            self.loading_list = loading_list
        elif not plugins:
            plugins = list(self._plugins.keys())

        if packet_mode:
            self.packet_mode = packet_mode

        if bpf:
            self.bpf = bpf

        if streaming or self.streaming:
            if 'socketSink' not in self._plugins:
                raise RuntimeError('socketSink plugin is required for running in streaming mode')
            elif (self.socketSink.SOCKTYPE != 1 or
                  self.socketSink.CONTENT_TYPE != 2 or
                  self.socketSink.GZ_COMPRESS != 0):
                raise RuntimeError("Invalid configuration for socketSink in streaming mode: requires 'SOCKTYPE=1', 'CONTENT_TYPE=2' and 'GZ_COMPRESS=0'")
            else:
                self.streaming = True

        if rebuild:
            if not self.loading_list:
                self.apply_changes()
                self.build()
            else:
                self.tranalyzer2.apply_changes()
                self.tranalyzer2.build()
                with open(self.loading_list) as f:
                    for plugin in f:
                        T2Utils.apply_config(plugin.strip())
                self.build(plugin=self.loading_list)

        def _run_tranalyzer(self):
            T2Utils.run_tranalyzer(
                t2_exec=self.t2_exec,
                pcap=self.pcap,
                iface=self.iface,
                pcap_list=self.pcap_list,
                output_prefix=self.output_prefix,
                log_file=True,
                packet_mode=self.packet_mode,
                plugin_folder=self.plugin_folder,
                bpf=self.bpf,
                loading_list=self.loading_list,
                plugins=plugins,
                timeout=timeout,
                verbose=verbose)

        if not self.streaming:
            _run_tranalyzer(self)
        else:
            self._setup_streaming()
            thread = threading.Thread(target=_run_tranalyzer, args=(self,))
            thread.daemon = True  # stop thread when run_tranalyzer returns
            thread.start()

    # =========================================================================
    # Access _headers.txt, _flows.txt, _packets.txt and _log.txt
    # =========================================================================

    def stream(self) -> Iterator[Dict[str, Any]]:
        """Iterator returning flow by flow.

        Raises
        ------
        RuntimeError

            - If the function was called, without configuring streaming mode first.
            - If socketSink did not open connection.

        Yields
        ------
        dict
            The next flow represented as a dict with the column names as key.

        Examples
        --------
        >>> t2 = T2(streaming=True, plugins=['basicFlow'])
        >>> t2.run(pcap='file.pcap')
        >>> for flow in t2.stream():
        ...     print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}")
        """
        if not self.streaming:
            raise RuntimeError("Calling 'stream' without configuring streaming mode first.")
        elif not self._conn:
            time.sleep(2)  # wait for tranalyzer to start and socketSink to connect

        if not self._conn:
            raise RuntimeError('socketSink did not open connection')

        buf = b''
        while True:
            buf += self._conn.recv(1024)
            if not buf:
                break
            lines = buf.split(b'\n')
            for line in lines[:-1]:
                yield json.loads(line)
            buf = lines[-1]

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        if self._conn:
            self._conn.close()

    def _load_file(self, filename: str) -> str:
        """Load a headers/flows/packets/log file.

        Parameters
        ----------
        filename : str
            The name of the file to load.
        """
        if not filename or not isfile(filename):
            return None
        data = None
        with open(filename) as f:
            data = f.read()
        return data

    def headers(self) -> str:
        """Return Tranalyzer2 `_headers.txt` file contents.

        Returns
        -------
        str
            The raw contents of the header file.

        Raises
        ------
        RuntimeError
            If the `_headers.txt` file could not be found.

        See Also
        --------
        T2.headers_file : Return the filename of the headers file.
        T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.headers()
        '# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...'
        """
        name = self.headers_file()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file first')
        return self._load_file(name)

    def flows_json(self) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.

        Returns
        -------
        list of dict
            The list of rows contained in the JSON flow file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If the `_flows.json` file could not be found.

        See Also
        --------
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flow_file_txt : Return the filename of the TXT flow file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.flows_json()
        [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
        """
        name = self.flow_file_json()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file first')
        return T2Utils.to_json_array(name)

    def flows_txt(
            self,
            delimiter: str = None
    ) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.

        Parameters
        ----------
        delimiter : str, default: None
            The delimiter used to separate columns in the input file.
            If `None`, default to '\\t' if the txtSink plugin is not loaded else to the value of `txtSink.SEP_CHR`.

        Returns
        -------
        list of dict
            The list of rows contained in the TXT flow file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If the `_flows.txt` file could not be found.

        See Also
        --------
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flow_file_json : Return the filename of the JSON flow file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.flows_txt()
        [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
        """
        name = self.flow_file_txt()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
        if delimiter is None:
            if 'txtSink' not in self._plugins:
                delimiter = '\t'
            else:
                delimiter = self.txtSink.SEP_CHR.strip('"')
                if delimiter[0] == '\\':
                    if delimiter[1] == 't':
                        delimiter = '\t'
                    elif delimiter[1] == 'n':
                        delimiter = '\n'
                    elif delimiter[1] == 'r':
                        delimiter = '\r'
                    elif delimiter[1] == '\\':
                        delimiter = '\\'
        return T2Utils.to_json_array(name, delimiter)

    def flows(self) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.

        Returns
        -------
        list of dict
            The list of rows contained in the flow file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If neither the `_flows.json` nor the `_flows.txt` file could be found.

        See Also
        --------
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.json` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.flows()
        [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
        """
        if 'jsonSink' in self._plugins:
            return self.flows_json()
        elif 'txtSink' in self._plugins:
            return self.flows_txt()
        else:
            raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create a flow file first')

    def packets(self) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.

        Returns
        -------
        list of dict
            The list of rows contained in the packet file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If the `_packets.txt` file could not be found.

        See Also
        --------
        T2.packet_file : Return the filename of the packet file.
        T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True)
        >>> t2.build()
        >>> t2.run()
        >>> t2.packets()
        [{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...]
        """
        name = self.packet_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file first")
        return T2Utils.to_json_array(name)

    def report(self) -> str:
        """Return Tranalyzer2 `_log.txt` file contents.

        Returns
        -------
        str
            The raw contents of the log file.

        Raises
        ------
        RuntimeError
            If the `_log.txt` file could not be found.

        See Also
        --------
        T2.log_file : Return the filename of the log file.
        T2.log : Alias for `T2.report()`

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.report()
        '================================================================================...'
        """
        name = self.log_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer to create a log file first")
        return self._load_file(name)

    def log(self) -> str:
        """Return Tranalyzer2 `_log.txt` file contents.

        Returns
        -------
        str
            The raw contents of the log file.

        Raises
        ------
        RuntimeError
            If the `_log.txt` file could not be found.

        See Also
        --------
        T2.report : Alias for `T2.log()`
        T2.log_file : Return the filename of the log file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.log()
        '================================================================================...'
        """
        return self.report()

    # =========================================================================
    # Print _headers.txt, _flows.txt, _packets.txt and _log.txt files contents
    # =========================================================================

    def print_headers(self):
        """Print the contents of Tranalyzer2 `_headers.txt` file.

        Raises
        ------
        RuntimeError
            If the header file could not be found.

        See Also
        --------
        T2.headers_file : Return the filename of the headers file.
        T2.headers : Return Tranalyzer2 `_headers.txt` file contents.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_headers()
        # Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)
        # Tranalyzer 0.8.11 (Anteater), Tarantula.
        ...
        """
        name = self.headers_file()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file first')
        with open(name) as f:
            print(f.read())

    def print_flows(self):
        """Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

        Raises
        ------
        RuntimeError
            If the flow file could not be found.

        See Also
        --------
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_flows()
        %dir    flowInd ...
        A       1       ...
        B       1       ...
        ...
        """
        name = self.flow_file()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
        with open(name) as f:
            print(f.read())

    def print_flows_json(self):
        """Print the contents of Tranalyzer2 `_flows.json` file.

        Raises
        ------
        RuntimeError
            If the JSON flow file could not be found.

        See Also
        --------
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_flows_json()
        {"dir":"A","flowInd":1,...}
        {"dir":"B","flowInd":1,...}
        ...
        """
        name = self.flow_file_json()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file first')
        with open(name) as f:
            print(f.read())

    def print_flows_txt(self):
        """Print the contents of Tranalyzer2 `_flows.txt` file.

        Raises
        ------
        RuntimeError
            If the flow file could not be found.

        See Also
        --------
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_flows_txt()
        %dir    flowInd ...
        A       1       ...
        B       1       ...
        ...
        """
        name = self.flow_file_txt()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
        with open(name) as f:
            print(f.read())

    def print_packets(self):
        """Print the contents of Tranalyzer2 `_packets.txt` file.

        Raises
        ------
        RuntimeError
            If the packet file could not be found.

        See Also
        --------
        T2.packet_file : Return the filename of the packet file.
        T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run(packet_mode=True)
        >>> t2.print_packets()
        %pktNo  flowInd ...
        1       1       ...
        2       1       ...
        ...
        """
        name = self.packet_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer with 'packet_mode=True' plugin to create a packet file first")
        with open(name) as f:
            print(f.read())

    def print_log(self):
        """Print the contents of Tranalyzer2 `_log.txt` file.

        Raises
        ------
        RuntimeError
            If the log file could not be found.

        See Also
        --------
        T2.print_log : Alias for `T2.print_report()`.
        T2.log_file : Return the filename of the log file.
        T2.log : Return Tranalyzer2 `_log.txt` file contents.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_log()
        """
        name = self.log_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer to create a log file first")
        with open(name) as f:
            print(f.read())

    def print_report(self):
        """Print the contents of Tranalyzer2 `_log.txt` file.

        Raises
        ------
        RuntimeError
            If the log file could not be found.

        See Also
        --------
        T2.print_report : Alias for `T2.print_log()`.
        T2.log_file : Return the filename of the log file.
        T2.log : Return Tranalyzer2 `_log.txt` file contents.
        T2.report : Alias for `T2.log()`.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_report()
        """
        self.print_log()

    # =========================================================================

    def follow_stream(
            self,
            flow: int,
            output_format: Union[int, str] = 2,
            direction: str = None,
            payload_format: Union[int, str] = 4,
            reassembly: bool = True,
            colors: bool = True
    ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]:
        """Return the reassembled payload of the flow with specified `index` and `direction`.

        Parameters
        ----------
        flow : int
            Index (`flowInd`) of the flow to analyse.
        output_format : int or str, default: 2
            The format to use for the output:

            - 0 or `'payload_only'`
            - 1 or `'info'`: Prefix each payload with packet/flow info
            - 2 or `'json'`
            - 3 or `'reconstruct'`

        direction : str, default: None
            The direction to follow (`'A'` or `'B'`).
            Use `None` to follow both directions.
        payload_format : int or str, default: 4
            The format to use for the payload:

            - 0 or 'ascii'
            - 1 or 'hexdump'
            - 2, 'raw' or 'binary'
            - 3 or 'base64'
            - 4 or 'bytearray'

        reassembly : bool, default: True
            Analyze TCP sequence numbers, reassemble and reorder TCP segments.
        colors : bool, default: True
            Output colors.

        Returns
        -------
        list of bytearray
            If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`.
        list of dict
            If `output_format` is 2.
        list of str
            If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`.
        str
            If none of the above applied.

        Raises
        ------
        RuntimeError

            - If the `_packets.txt` file could not be found.
            - If `output_format` or `payload_format` was out of bound.
            - If `direction` was not `'A'`, `'B'` or `None`.
            - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`.
            - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`.

        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        See Also
        --------
        t2py.T2Utils.T2Utils.follow_stream : Call `tawk` `follow_stream()` function with the specified options.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], packet_mode=True)
        >>> t2.tranalyzer2.SPKTMD_PCNTH = 1
        >>> t2.tranalyzer2.apply_changes()
        >>> t2.build()
        >>> t2.run()
        >>> t2.follow_stream(1, direction='B')
        [...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\\r\\n')},....]
        """
        name = self.packet_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file first")
        return T2Utils.follow_stream(name, flow, output_format, direction, payload_format, reassembly, colors)

    # =========================================================================

    def to_pandas(
            self,
            infile: str = None,
            delimiter: str = None
    ) -> "pandas.core.frame.DataFrame":
        """Convert the flow file or `infile` to pandas `DataFrame`.

        Parameters
        ----------
        infile : str
            Path to a flow or packet file.
        delimiter : str, default: None
            Field delimiter used in the input file.

        Returns
        -------
        pd.DataFrame
            DataFrame holding the tabular data stored in `infile`.

        Raises
        ------
        RuntimeError
            If `infile` could not be found.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> df = t2.to_pandas()
        >>> type(df)
        <class 'pandas.core.frame.DataFrame'>
        """
        infile = infile if infile else self.flow_file()
        if not infile:
            raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create a flow file first')
        return T2Utils.to_pandas(infile, delimiter)

    # =========================================================================
    # Private functions
    # =========================================================================

    def _output_prefix(self) -> str:
        """Return the output prefix (derived from Tranalyzer2 input if not set).

        Returns
        -------
        str
            The output_prefix.
        """
        if self.output_prefix:
            if not isdir(self.output_prefix) or not self.pcap:
                return self.output_prefix
            else:
                filename = self.pcap.split('/')[-1]
                if '.' in filename:
                    return join(self.output_prefix, '.'.join(filename.split('.')[:-1]))
                else:
                    return join(self.output_prefix, filename)
        elif self.pcap:
            if '.' in self.pcap:
                return '.'.join(self.pcap.split('.')[:-1])
            else:
                return self.pcap
        else:
            return None

    def _add_dict_entry(self, plugin: str):
        if plugin not in self.__dict__:
            self.__dict__[plugin] = self._plugins[plugin]

    def _add_dict_entries(self, plugins: List[str]):
        for plugin in plugins:
            self._add_dict_entry(plugin)

    def _remove_dict_entry(self, plugin: str):
        if plugin in self.__dict__:
            del self.__dict__[plugin]

    def _remove_dict_entries(self, plugins: List[str]):
        for plugin in plugins:
            self._remove_dict_entry(plugin)

Classes

class T2 (pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, output_format: Union[str, List[str]] = None, bpf: str = None, streaming: bool = False)

Manage several t2py.T2Plugin objects and run T2, convert/display flow file, …

Parameters

pcap : str, default: None
Path to a pcap file.
iface : str, default: None
Name of a network interface.
pcap_list : str or list of str, default: None
  • Path to a list of pcap files, e.g., '/tmp/myPcaps.txt'.
  • Or list of path to pcap files, e.g., ['file1.pcap', 'file2.pcap'].
output_prefix : str, default: None
If None, automatically derived from input.
packet_mode : bool, default: False
Activate Tranalyzer2 packet mode.
plugin_folder : str, default: None
Path to the plugin folder. If None, default to $HOME/.tranalyzer/plugins.
loading_list : str, default: None
Path to a plugin loading list.
plugins : list of str, default: None,
A list of plugin names to load.
output_format : str or list of str, default: None
Select and configure the sink plugin(s) required to generate the requested output format. See also T2.add_output_format().
bpf : str, default: None
A BPF filter.
streaming : bool, default: False
If True, then T2.stream() can be used to yield one flow at a time.

Raises

OSError
If specified interface iface does not exist locally.
RuntimeError
If more than one input (pcap, pcap_list, iface) is specified.
Expand source code
class T2:

    """Manage several `t2py.T2Plugin` objects and run T2, convert/display flow file, ...

    Parameters
    ----------
    pcap: str, default: None
        Path to a pcap file.
    iface: str, default: None
        Name of a network interface.
    pcap_list : str or list of str, default: None

        - Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`.
        - Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`.

    output_prefix: str, default: None
        If `None`, automatically derived from input.
    packet_mode: bool, default: False
        Activate Tranalyzer2 packet mode.
    plugin_folder: str, default: None
        Path to the plugin folder.
        If `None`, default to `$HOME/.tranalyzer/plugins`.
    loading_list: str, default: None
        Path to a plugin loading list.
    plugins: list of str, default: None,
        A list of plugin names to load.
    output_format: str or list of str, default: None
        Select and configure the sink plugin(s) required to generate the requested output format.
        See also `T2.add_output_format()`.
    bpf: str, default: None
        A BPF filter.
    streaming: bool, default: False
        If `True`, then `T2.stream()` can be used to yield one flow at a time.

    Raises
    ------
    OSError
        If specified interface `iface` does not exist locally.
    RuntimeError
        If more than one input (`pcap`, `pcap_list`, `iface`) is specified.
    """

    plugin_folder: str = None
    """Path to the plugin folder. If `None`, default to `'$HOME/.tranalyzer/plugins'`."""

    def __init__(
            self,
            pcap: str = None,
            iface: str = None,
            pcap_list: Union[str, List[str]] = None,
            output_prefix: str = None,
            packet_mode: bool = False,
            plugin_folder: str = None,
            loading_list: str = None,
            plugins: List[str] = None,
            output_format: Union[str, List[str]] = None,
            bpf: str = None,
            streaming: bool = False
    ):

        if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list):
            raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified')
        elif iface and iface not in T2Utils.network_interfaces():
            raise OSError(f'Interface {iface} does not exist locally')

        # Private variables
        self._whitelisted_plugins = []
        self._default_plugin_folder = None
        self._tranalyzer2 = None
        self._plugins = dict()
        self._conn = None
        self._streaming = False

        # Input
        self.pcap = pcap
        self.iface = iface
        self.pcap_list = pcap_list

        # Output
        self.output_prefix = output_prefix
        self.packet_mode = packet_mode

        # Options
        self.bpf = bpf
        self.plugin_folder = plugin_folder
        self.loading_list = loading_list

        self.streaming = streaming

        if plugins:
            self.add_plugins(plugins)

        if output_format:
            self.add_output_format(output_format)

    def add_output_format(
            self,
            extension: Union[str, List[str]]
    ):
        """Select and configure the sink plugin(s) required to generate the requested output format.

        Parameters
        ----------
        extension : str or list of str
            `extension` may be any of:

            - `'bin'`       -> binSink
            - `'bin.gz'`    -> binSink + `GZ_COMPRESS = 1`
            - `'csv'`       -> txtSink + `SEP_CHR = ','`
            - `'csv.gz'`    -> txtSink + `SEP_CHR = ','` + `GZ_COMPRESS = 1`
            - `'json'`      -> jsonSink
            - `'json.gz'`   -> jsonSink + `GZ_COMPRESS = 1`
            - `'txt'`       -> txtSink
            - `'txt.gz'`    -> txtSink + `GZ_COMPRESS = 1`
            - `'netflow'`   -> netflowSink
            - `'mysql'`     -> mysqlSink
            - `'sqlite'`    -> sqliteSink
            - `'postgres'`  -> psqlSink
            - `'mongo'`     -> mongoSink
            - `'socket'`    -> socketSink
            - `'streaming'` -> socketSink + `CONTENT_TYPE = 2`
            - `'pcap'`      -> pcapd

        Raises
        ------
        RuntimeError
            If one of the specified extension is not supported.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.add_output_format(['json.gz', 'txt'])
        """
        if type(extension) == list:
            extensions = extension
        else:
            extensions = [extension]
        for ext in extensions:
            if ext == 'bin':
                self.add_plugin('binSink')
            elif ext == 'bin.gz':
                self.add_plugin('binSink')
                self.binSink.GZ_COMPRESS = 1
            elif ext == 'csv':
                self.add_plugin('txtSink')
                self.txtSink.SEP_CHR = ','
            elif ext == 'csv.gz':
                self.add_plugin('txtSink')
                self.txtSink.SEP_CHR = ','
                self.txtSink.GZ_COMPRESS = 1
            elif ext == 'txt':
                self.add_plugin('txtSink')
            elif ext == 'txt.gz':
                self.add_plugin('txtSink')
                self.txtSink.GZ_COMPRESS = 1
            elif ext == 'json':
                self.add_plugin('jsonSink')
            elif ext == 'json.gz':
                self.add_plugin('jsonSink')
                self.jsonSink.GZ_COMPRESS = 1
            elif ext == 'netflow':
                self.add_plugin('netflowSink')
            elif ext == 'mysql':
                self.add_plugin('mysqlSink')
            elif ext == 'sqlite':
                self.add_plugin('sqliteSink')
            elif ext == 'postgres':
                self.add_plugin('psqlSink')
            elif ext == 'mongo':
                self.add_plugin('mongoSink')
            elif ext == 'socket':
                self.add_plugin('socketSink')
            elif ext == 'pcap':
                self.add_plugin('pcapd')
            elif ext == 'streaming':
                self.streaming = True
            else:
                raise RuntimeError(f"output format '{ext}' not supported")

    @property
    def default_plugin_folder(self) -> str:
        """Path to the default plugin folder, i.e., `$HOME/.tranalyzer/plugins`."""
        if not self._default_plugin_folder:
            self._default_plugin_folder = join(expanduser('~'), '.tranalyzer', 'plugins')
        return self._default_plugin_folder

    @property
    def tranalyzer2(self) -> T2Plugin:
        """`t2py.T2Plugin` object representing Tranalyzer2 (core)."""
        if not self._tranalyzer2:
            self._tranalyzer2 = T2Plugin('tranalyzer2')
        return self._tranalyzer2

    @property
    def loading_list(self):
        """Get or set the path to a plugin loading list.

        In addition, the setter calls the `T2.add_plugin()` function for each plugin listed in the file.
        """
        return self._loading_list

    @loading_list.setter
    def loading_list(self, loading_list: str):
        self._whitelisted_plugins.clear()
        self._loading_list = loading_list
        if loading_list:
            with open(loading_list) as f:
                for plugin in f:
                    plugin = plugin.strip()
                    if len(plugin) > 0 and not plugin.startswith('#'):
                        self.add_plugin(plugin)
                        self._whitelisted_plugins.append(plugin)

    @property
    def plugins(self) -> Dict[str, T2Plugin]:
        """Dictionary of `t2py.T2Plugin` objects whose key is the plugin name."""
        return self._plugins

    @property
    def t2_exec(self) -> str:
        """Path to the Tranalyzer2 executable."""
        if self.plugin_folder:
            return join(self.plugin_folder, 'bin', 'tranalyzer')
        else:
            return join(self.default_plugin_folder, 'bin', 'tranalyzer')

    @property
    def streaming(self):
        """Allow `tranalyzer2` to run in streaming mode (call `T2.stream()` to access flows one by one)."""
        return self._streaming

    @streaming.setter
    def streaming(self, activate: bool):
        if activate:
            self.add_plugin('socketSink')
            self.socketSink.CONTENT_TYPE = 2
        self._streaming = activate

    def headers_file(self) -> str:
        """Return the filename of the headers file.

        Returns
        -------
        str
            The filename of the headers file.

        See Also
        --------
        T2.headers : Return Tranalyzer2 `_headers.txt` file contents.
        T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.run()
        >>> t2.headers_file()
        '/tmp/file_headers.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_headers.txt' if output_prefix else None

    def flow_file_json(self) -> str:
        """Return the filename of the JSON flow file.

        Returns
        -------
        str
            The filename of the JSON flow file.

        See Also
        --------
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='json')
        >>> t2.run()
        >>> t2.flow_file_json()
        '/tmp/file_flows.json'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_flows.json' if output_prefix else None

    def flow_file_txt(self) -> str:
        """Return the filename of the TXT flow file.

        Returns
        -------
        str
            The filename of the TXT flow file.

        See Also
        --------
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv')
        >>> t2.run()
        >>> t2.flow_file_txt()
        '/tmp/file_flows.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_flows.txt' if output_prefix else None

    def flow_file(self) -> str:
        """Return the filename of the JSON (preferred) or TXT flow file.

        Returns
        -------
        str
            The filename of the flow file.

        See Also
        --------
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt'])
        >>> t2.run()
        >>> t2.flow_file()
        '/tmp/file_flows.json'
        """
        if 'jsonSink' in self._plugins:
            return self.flow_file_json()
        elif 'txtSink' in self._plugins:
            return self.flow_file_txt()
        else:
            return None

    def packet_file(self) -> str:
        """Return the filename of the packet file.

        Returns
        -------
        str
            The filename of the packet file.

        See Also
        --------
        T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.
        T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt`.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap')
        >>> t2.run(packet_mode=True)
        >>> t2.packet_file()
        '/tmp/file_packets.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_packets.txt' if output_prefix else None

    def log_file(self) -> str:
        """Return the filename of the log file.

        Returns
        -------
        str
            The filename of the log file.

        See Also
        --------
        T2.log : Return Tranalyzer2 `_log.txt` file contents.
        T2.print_log : Print the contents of Tranalyzer2 `_log.txt` file.
        T2.report : Alias for `T2.log()`.
        T2.print_report : Alias for `T2.print_log()`.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
        >>> t2.run()
        >>> t2.log_file()
        '/tmp/file_log.txt'
        """
        output_prefix = self._output_prefix()
        return output_prefix + '_log.txt' if output_prefix else None

    # ====================================================================== #

    def status(self):
        """Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers).

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz')
        >>> t2.status()
        """
        if self._tranalyzer2:
            t2 = self.tranalyzer2
            pending_changes = len(t2.changes)
            if pending_changes > 0:
                print(f'Tranalyzer2 [{pending_changes} change{"s" if pending_changes > 1 else ""} pending]:')
                for name, value in t2.changes.items():
                    print(f'    {name} = {value}')
                print()
        print('Plugins: ')
        num_plugins = 0
        for plugin_name in sorted(self._plugins, key=lambda name: self._plugins[name].number):
            plugin = self._plugins[plugin_name]
            pending_changes = len(plugin.changes)
            print(f'    {num_plugins+1}: {plugin_name}', end='')
            if pending_changes == 0:
                if len(self._whitelisted_plugins) > 0 and plugin_name not in self._whitelisted_plugins:
                    print(' [NOT IN LOADING LIST]')
                else:
                    print()
            else:
                print(f' [{pending_changes} change{"s" if pending_changes > 1 else ""} pending]', end='')
                if len(self._whitelisted_plugins) > 0 and plugin_name not in self._whitelisted_plugins:
                    print(' [NOT IN LOADING LIST]')
                else:
                    print()
                for name, value in plugin.changes.items():
                    print(f'            {name} = {value}')
            num_plugins += 1
        if num_plugins == 0:
            print('    None')
        print('\nTranalyzer options:')
        if self.pcap:
            print(f'    -r {self.pcap}')
        elif self.iface:
            print(f'    -i {self.iface}')
        elif self.pcap_list:
            if type(self.pcap_list) == list:
                print('    -R /tmp/pcap_list.txt')
            else:
                print(f'    -R {self.pcap_list}')
        if self.output_prefix:
            print(f'    -w {self.output_prefix}')
        if self.loading_list:
            print(f'    -b {self.loading_list}')
        if self.plugin_folder:
            print(f'    -p {self.plugin_folder}')
        if self.packet_mode:
            print('    -s')
        print('    -l')
        if self.bpf:
            print(f'    BPF filter: {self.bpf}')

    # ====================================================================== #

    def clear_plugins(self):
        """Clear the list of plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats'])
        >>> t2.clear_plugins()
        >>> t2.list_plugins()
        []
        """
        self._remove_dict_entries(list(self._plugins.keys()))
        self._plugins.clear()

    def set_plugins(
            self,
            plugins: List[str] = None
    ):
        """Overwrite the current list of plugins.

        Parameters
        ----------
        plugins : list of str, default: None
            The list of plugins to use.
            If `None`, empty the list of plugins.

        See Also
        --------
        T2.list_plugins : Alphabetically list the currently active plugins.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.set_plugins(['arpDecode', 'basicStats'])
        >>> t2.list_plugins()
        ['arpDecode', 'basicStats']
        >>> type(t2.arpDecode)
        <class 't2py.T2Plugin.T2Plugin'>
        >>> t2.basicStats.description
        'Basic statistics'
        """
        self.clear_plugins()
        if plugins:
            self._plugins = {plugin: T2Plugin(plugin) for plugin in plugins}
            self._add_dict_entries(plugins)

    def add_plugin(
            self,
            plugin: str
    ):
        """Add a plugin to the list of plugins.

        Parameters
        ----------
        plugin : str
            The name of the plugin to add.

        See Also
        --------
        T2.add_plugins : Add a list of plugins to the list of plugins.
        T2.remove_plugin : Remove a plugin from the list of plugins.
        T2.remove_plugins : Remove a list of plugins from the list of plugins.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.add_plugin('arpDecode')
        >>> type(t2.arpDecode)
        <class 't2py.T2Plugin.T2Plugin'>
        >>> t2.arpDecode.number
        '179'
        """
        if plugin not in self._plugins:
            self._plugins[plugin] = T2Plugin(plugin)
            self._add_dict_entry(plugin)

    def add_plugins(
            self,
            plugins: List[str]
    ):
        """Add a list of plugins to the list of plugins.

        Parameters
        ----------
        plugins : list of str
            The list of plugins to add.

        See Also
        --------
        T2.add_plugin : Add a plugin to the list of plugins.
        T2.remove_plugin : Remove a plugin from the list of plugins.
        T2.remove_plugins : Remove a list of plugins from the list of plugins.

        Examples
        --------
        >>> t2 = T2()
        >>> t2.add_plugins(['arpDecode', 'basicStats'])
        >>> type(t2.arpDecode)
        <class 't2py.T2Plugin.T2Plugin'>
        >>> t2.basicStats.description
        'Basic statistics'
        """
        for plugin in plugins:
            self.add_plugin(plugin)

    def remove_plugin(
            self,
            plugin: str
    ):
        """Remove a plugin from the list of plugins.

        Parameters
        ----------
        plugin : str
            The name of the plugin to remove.

        See Also
        --------
        T2.add_plugin : Add a plugin to the list of plugins.
        T2.add_plugins : Add a list of plugins to the list of plugins.
        T2.remove_plugins : Remove a list of plugins from the list of plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats'])
        >>> t2.remove_plugin('arpDecode')
        >>> t2.list_plugins()
        ['basicStats']
        """
        self._remove_dict_entry(plugin)
        self._plugins.pop(plugin)

    def remove_plugins(
            self,
            plugins: List[str]
    ):
        """Remove a list of plugins from the list of plugins.

        Parameters
        ----------
        plugins : list of str
            The list of plugins to remove.

        See Also
        --------
        T2.add_plugin : Add a plugin to the list of plugins.
        T2.add_plugins : Add a list of plugins to the list of plugins.
        T2.remove_plugin : Remove a plugin from the list of plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.remove_plugins(['arpDecode', 'basicStats'])
        >>> t2.list_plugins()
        ['tcpFlags']
        """
        for plugin in plugins:
            self.remove_plugin(plugin)

    # =========================================================================

    def list_plugins(self):
        """Alphabetically list the loaded plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.list_plugins()
        ['arpDecode', 'basicStats', 'tcpFlags']
        >>> t2.arpDecode.description
        'Address Resolution Protocol (ARP)'
        """
        return sorted(list(self._plugins.keys()))

    def create_plugin_list(
            self,
            plugins: List[str] = None,
            outfile: str = None,
            verbose: bool = False
    ):
        """Create a loading list of plugins.

        Parameters
        ----------
        plugins : list of str, default: None
            List of plugin names.
            If `None`, use `T2.list_plugins()`.
        outfile : str, default: None
            Name of the file to save.
            If `None`, default to `'plugins.load'` in the plugin folder.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.create_plugin_list()
        >>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt')
        >>> t2.create_plugin_list(['basicFlow', 'txtSink'])
        >>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt')
        """
        if not outfile:
            if self.plugin_folder:
                outfile = join(self.plugin_folder, 'plugins.load')
            else:
                outfile = join(self.default_plugin_folder, 'plugins.load')
        if not plugins:
            plugins = list(self._plugins.keys())
        T2Utils.create_plugin_list(plugins, outfile, verbose=verbose)

    # =========================================================================

    def build(
            self,
            plugin: Union[str, List[str]] = None,
            plugin_folder: str = None,
            force_rebuild: bool = False,
            debug: bool = False,
            verbose: bool = False
    ):
        """Build Tranalyzer2 and the plugins.

        Parameters
        ----------
        plugin : str or list of str, default: None
            List of plugin names to build or path to a plugin loading list.
            If `None`, use `T2.list_plugins()`.
        plugin_folder : str, default: None
            Path to the plugin folder.
            If `None`, use `T2.plugin_folder`.
        force_rebuild : bool, default: False
            Force the rebuild of the configuration files (`$ t2build -r ...`).
        debug : bool, default: False
            Build the plugin in debug mode.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        See Also
        --------
        T2.clean : Remove files generated by `T2.build()`.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.build()
        """
        plugin_folder = plugin_folder if plugin_folder else self.plugin_folder
        to_build = plugin if plugin else list(self._plugins.keys())
        if not plugin:
            to_build.append('tranalyzer2')
        T2Utils.build(to_build, plugin_folder, force_rebuild, debug, verbose=verbose)

    def clean(
            self,
            plugin: Union[str, List[str]] = None,
            verbose: bool = False
    ):
        """Clean Tranalyzer2 and the plugins.

        Parameters
        ----------
        plugin : str or list of str, default: None
            List of plugin names to clean, or path to a plugin loading list.
            If `None`, use `T2.list_plugins()`.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        See Also
        --------
        T2.build : Build Tranalyzer2 and the plugins.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.build()
        >>> t2.clean()
        """
        to_clean = plugin if plugin else list(self._plugins.keys())
        if not plugin:
            to_clean.append('tranalyzer2')
        T2Utils.clean(to_clean, verbose=verbose)

    def unload(
            self,
            plugin: Union[str, List[str]] = None,
            plugin_folder: str = None,
            verbose: bool = False
    ):
        """Remove (unload) plugin(s) from the plugin folder.

        Parameters
        ----------
        plugin : str or list of str, default: None
            `plugin` can be:

            - a single plugin, e.g., `'pluginName'`
            - a list of plugins, e.g., `['pluginName1', 'pluginName2']`
            - a plugin loading list, e.g., `'myPlugins.txt'`
            - `all` to apply the operation to all available plugins
            - If `None`, use `T2.list_plugins()`.

        plugin_folder : str, default: None
            The plugin folder from where the plugin must be unloaded.
            If `None`, use `T2.plugin_folder`.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
        >>> t2.build()
        >>> t2.unload()
        """
        plugin_folder = plugin_folder if plugin_folder else self.plugin_folder
        to_unload = plugin if plugin else list(self._plugins.keys())
        T2Utils.unload(to_unload, plugin_folder, verbose=verbose)

    def apply_changes(
            self,
            verbose: bool = False
    ):
        """Apply/persist the current configuration to the source files.

        Parameters
        ----------
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        subprocess.CalledProcessError
            If one of the processes exited with a non-zero exit code.

        Examples
        --------
        >>> t2 = T2(output_format='csv.gz')
        >>> t2.apply_changes()
        """
        self.tranalyzer2.apply_changes(verbose=verbose)
        for plugin_name, plugin in self._plugins.items():
            plugin.apply_changes(verbose=verbose)

    def discard_changes(self):
        """Discard all non-committed changes made in this session.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'txtSink'])
        >>> t2.arpDecode.MAX_IP = 5
        >>> t2.discard_changes()
        """
        for plugin_name, plugin in self._plugins.items():
            plugin.discard_changes()

    def reset(self):
        """Reset all plugins' flags to their default value.

        Examples
        --------
        >>> t2 = T2(plugins=['arpDecode', 'txtSink'])
        >>> t2.arpDecode.MAX_IP = 5
        >>> t2.apply_config()
        >>> t2.reset()
        >>> t2.apply_config()
        """
        for plugin_name, plugin in self._plugins.items():
            plugin.reset()

    def _setup_streaming(self):
        assert 'socketSink' in self._plugins

        # start socket listener in separate thread
        def _socket_sink_listener(self):
            import socket
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind((self.socketSink.SERVADD, self.socketSink.DPORT))
            s.listen(1)
            self._conn, addr = s.accept()
            s.close()

        print(f'Setting up streaming server {self.socketSink.SERVADD}:{self.socketSink.DPORT}')
        thread = threading.Thread(target=_socket_sink_listener, args=(self,))
        thread.daemon = True  # stop thread when _socket_sink_listener returns
        thread.start()

    def run(
            self,
            pcap: str = None,
            iface: str = None,
            pcap_list: Union[str, List[str]] = None,
            output_prefix: str = None,
            packet_mode: bool = False,
            plugins: List[str] = None,
            plugin_folder: str = None,
            loading_list: str = None,
            bpf: str = None,
            rebuild: bool = False,
            streaming: bool = False,
            timeout: int = None,
            verbose: bool = False
    ):
        """Run Tranalyzer2.

        Parameters
        ----------
        pcap : str, default: None
            Path to a pcap file.
        iface : str, default: None
            Name of a network interface.
        pcap_list : str or list of str, default: None
            Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`.
            Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`.
        output_prefix : str, default: None
            If `None`, automatically derived from input.
        packet_mode : bool, default: False
            Activate Tranalyzer2 packet mode.
        plugin_folder : str, default: None
            Path to the plugin folder.
        plugins : list of str, default: None
            The list of plugins to use.
            If `None`, use `T2.list_plugins()`.
        loading_list : str, default: None
            Path to a plugin loading list.
            If `None`, create one using `T2.list_plugins()`.
        bpf : str, default: None
            A BPF filter.
        rebuild : bool, default, default: False
            Apply the current changes and rebuild Tranalyzer2 and the plugins.
        streaming: bool, default: False
            If `True`, then `T2.stream()` can be used to yield one flow at a time.
        timeout : int, default: None
            Number of seconds after which to terminate the process.
            If `None`, run forever or until the end of file is reached.
        verbose : bool, default: False

            - If `True`, print the output (`stdout` and `stderr`) of the command.
            - If `False`, do not print anything.

        Raises
        ------
        OSError
            If specified interface `iface` does not exist locally.
        RuntimeError

            - If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified.
            - If socketSink plugin is not loaded or misconfigured when `T2.streaming` is `True`.

        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.
        subprocess.TimeoutExpired
            If `timeout` was expired and its value reached.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
        >>> t2.build()
        >>> t2.run()
        >>> t2.run(packet_mode=True)
        """
        if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list):
            raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified')

        if (
            not pcap and not self.pcap and
            not iface and not self.iface and
            not pcap_list and not self.pcap_list
        ):
            raise RuntimeError('An input (pcap, pcap_list or iface) is required')

        if pcap:
            self.pcap = pcap
        elif iface:
            self.iface = iface
        elif pcap_list:
            self.pcap_list = pcap_list

        if output_prefix:
            self.output_prefix = output_prefix

        if plugin_folder:
            self.plugin_folder = plugin_folder

        if loading_list:
            self.loading_list = loading_list
        elif not plugins:
            plugins = list(self._plugins.keys())

        if packet_mode:
            self.packet_mode = packet_mode

        if bpf:
            self.bpf = bpf

        if streaming or self.streaming:
            if 'socketSink' not in self._plugins:
                raise RuntimeError('socketSink plugin is required for running in streaming mode')
            elif (self.socketSink.SOCKTYPE != 1 or
                  self.socketSink.CONTENT_TYPE != 2 or
                  self.socketSink.GZ_COMPRESS != 0):
                raise RuntimeError("Invalid configuration for socketSink in streaming mode: requires 'SOCKTYPE=1', 'CONTENT_TYPE=2' and 'GZ_COMPRESS=0'")
            else:
                self.streaming = True

        if rebuild:
            if not self.loading_list:
                self.apply_changes()
                self.build()
            else:
                self.tranalyzer2.apply_changes()
                self.tranalyzer2.build()
                with open(self.loading_list) as f:
                    for plugin in f:
                        T2Utils.apply_config(plugin.strip())
                self.build(plugin=self.loading_list)

        def _run_tranalyzer(self):
            T2Utils.run_tranalyzer(
                t2_exec=self.t2_exec,
                pcap=self.pcap,
                iface=self.iface,
                pcap_list=self.pcap_list,
                output_prefix=self.output_prefix,
                log_file=True,
                packet_mode=self.packet_mode,
                plugin_folder=self.plugin_folder,
                bpf=self.bpf,
                loading_list=self.loading_list,
                plugins=plugins,
                timeout=timeout,
                verbose=verbose)

        if not self.streaming:
            _run_tranalyzer(self)
        else:
            self._setup_streaming()
            thread = threading.Thread(target=_run_tranalyzer, args=(self,))
            thread.daemon = True  # stop thread when run_tranalyzer returns
            thread.start()

    # =========================================================================
    # Access _headers.txt, _flows.txt, _packets.txt and _log.txt
    # =========================================================================

    def stream(self) -> Iterator[Dict[str, Any]]:
        """Iterator returning flow by flow.

        Raises
        ------
        RuntimeError

            - If the function was called, without configuring streaming mode first.
            - If socketSink did not open connection.

        Yields
        ------
        dict
            The next flow represented as a dict with the column names as key.

        Examples
        --------
        >>> t2 = T2(streaming=True, plugins=['basicFlow'])
        >>> t2.run(pcap='file.pcap')
        >>> for flow in t2.stream():
        ...     print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}")
        """
        if not self.streaming:
            raise RuntimeError("Calling 'stream' without configuring streaming mode first.")
        elif not self._conn:
            time.sleep(2)  # wait for tranalyzer to start and socketSink to connect

        if not self._conn:
            raise RuntimeError('socketSink did not open connection')

        buf = b''
        while True:
            buf += self._conn.recv(1024)
            if not buf:
                break
            lines = buf.split(b'\n')
            for line in lines[:-1]:
                yield json.loads(line)
            buf = lines[-1]

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        if self._conn:
            self._conn.close()

    def _load_file(self, filename: str) -> str:
        """Load a headers/flows/packets/log file.

        Parameters
        ----------
        filename : str
            The name of the file to load.
        """
        if not filename or not isfile(filename):
            return None
        data = None
        with open(filename) as f:
            data = f.read()
        return data

    def headers(self) -> str:
        """Return Tranalyzer2 `_headers.txt` file contents.

        Returns
        -------
        str
            The raw contents of the header file.

        Raises
        ------
        RuntimeError
            If the `_headers.txt` file could not be found.

        See Also
        --------
        T2.headers_file : Return the filename of the headers file.
        T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.headers()
        '# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...'
        """
        name = self.headers_file()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file first')
        return self._load_file(name)

    def flows_json(self) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.

        Returns
        -------
        list of dict
            The list of rows contained in the JSON flow file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If the `_flows.json` file could not be found.

        See Also
        --------
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flow_file_txt : Return the filename of the TXT flow file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.flows_json()
        [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
        """
        name = self.flow_file_json()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file first')
        return T2Utils.to_json_array(name)

    def flows_txt(
            self,
            delimiter: str = None
    ) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.

        Parameters
        ----------
        delimiter : str, default: None
            The delimiter used to separate columns in the input file.
            If `None`, default to '\\t' if the txtSink plugin is not loaded else to the value of `txtSink.SEP_CHR`.

        Returns
        -------
        list of dict
            The list of rows contained in the TXT flow file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If the `_flows.txt` file could not be found.

        See Also
        --------
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flow_file_json : Return the filename of the JSON flow file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.flows_txt()
        [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
        """
        name = self.flow_file_txt()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
        if delimiter is None:
            if 'txtSink' not in self._plugins:
                delimiter = '\t'
            else:
                delimiter = self.txtSink.SEP_CHR.strip('"')
                if delimiter[0] == '\\':
                    if delimiter[1] == 't':
                        delimiter = '\t'
                    elif delimiter[1] == 'n':
                        delimiter = '\n'
                    elif delimiter[1] == 'r':
                        delimiter = '\r'
                    elif delimiter[1] == '\\':
                        delimiter = '\\'
        return T2Utils.to_json_array(name, delimiter)

    def flows(self) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.

        Returns
        -------
        list of dict
            The list of rows contained in the flow file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If neither the `_flows.json` nor the `_flows.txt` file could be found.

        See Also
        --------
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
        T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.
        T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
        T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.json` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.flows()
        [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
        """
        if 'jsonSink' in self._plugins:
            return self.flows_json()
        elif 'txtSink' in self._plugins:
            return self.flows_txt()
        else:
            raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create a flow file first')

    def packets(self) -> List[Dict[str, Any]]:
        """Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.

        Returns
        -------
        list of dict
            The list of rows contained in the packet file.
            Each row is represented as a dict with the column names as key.

        Raises
        ------
        RuntimeError
            If the `_packets.txt` file could not be found.

        See Also
        --------
        T2.packet_file : Return the filename of the packet file.
        T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt` file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True)
        >>> t2.build()
        >>> t2.run()
        >>> t2.packets()
        [{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...]
        """
        name = self.packet_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file first")
        return T2Utils.to_json_array(name)

    def report(self) -> str:
        """Return Tranalyzer2 `_log.txt` file contents.

        Returns
        -------
        str
            The raw contents of the log file.

        Raises
        ------
        RuntimeError
            If the `_log.txt` file could not be found.

        See Also
        --------
        T2.log_file : Return the filename of the log file.
        T2.log : Alias for `T2.report()`

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.report()
        '================================================================================...'
        """
        name = self.log_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer to create a log file first")
        return self._load_file(name)

    def log(self) -> str:
        """Return Tranalyzer2 `_log.txt` file contents.

        Returns
        -------
        str
            The raw contents of the log file.

        Raises
        ------
        RuntimeError
            If the `_log.txt` file could not be found.

        See Also
        --------
        T2.report : Alias for `T2.log()`
        T2.log_file : Return the filename of the log file.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.log()
        '================================================================================...'
        """
        return self.report()

    # =========================================================================
    # Print _headers.txt, _flows.txt, _packets.txt and _log.txt files contents
    # =========================================================================

    def print_headers(self):
        """Print the contents of Tranalyzer2 `_headers.txt` file.

        Raises
        ------
        RuntimeError
            If the header file could not be found.

        See Also
        --------
        T2.headers_file : Return the filename of the headers file.
        T2.headers : Return Tranalyzer2 `_headers.txt` file contents.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_headers()
        # Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)
        # Tranalyzer 0.8.11 (Anteater), Tarantula.
        ...
        """
        name = self.headers_file()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file first')
        with open(name) as f:
            print(f.read())

    def print_flows(self):
        """Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

        Raises
        ------
        RuntimeError
            If the flow file could not be found.

        See Also
        --------
        T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
        T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_flows()
        %dir    flowInd ...
        A       1       ...
        B       1       ...
        ...
        """
        name = self.flow_file()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
        with open(name) as f:
            print(f.read())

    def print_flows_json(self):
        """Print the contents of Tranalyzer2 `_flows.json` file.

        Raises
        ------
        RuntimeError
            If the JSON flow file could not be found.

        See Also
        --------
        T2.flow_file_json : Return the filename of the JSON flow file.
        T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_flows_json()
        {"dir":"A","flowInd":1,...}
        {"dir":"B","flowInd":1,...}
        ...
        """
        name = self.flow_file_json()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file first')
        with open(name) as f:
            print(f.read())

    def print_flows_txt(self):
        """Print the contents of Tranalyzer2 `_flows.txt` file.

        Raises
        ------
        RuntimeError
            If the flow file could not be found.

        See Also
        --------
        T2.flow_file_txt : Return the filename of the TXT flow file.
        T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_flows_txt()
        %dir    flowInd ...
        A       1       ...
        B       1       ...
        ...
        """
        name = self.flow_file_txt()
        if not name or not isfile(name):
            raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
        with open(name) as f:
            print(f.read())

    def print_packets(self):
        """Print the contents of Tranalyzer2 `_packets.txt` file.

        Raises
        ------
        RuntimeError
            If the packet file could not be found.

        See Also
        --------
        T2.packet_file : Return the filename of the packet file.
        T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run(packet_mode=True)
        >>> t2.print_packets()
        %pktNo  flowInd ...
        1       1       ...
        2       1       ...
        ...
        """
        name = self.packet_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer with 'packet_mode=True' plugin to create a packet file first")
        with open(name) as f:
            print(f.read())

    def print_log(self):
        """Print the contents of Tranalyzer2 `_log.txt` file.

        Raises
        ------
        RuntimeError
            If the log file could not be found.

        See Also
        --------
        T2.print_log : Alias for `T2.print_report()`.
        T2.log_file : Return the filename of the log file.
        T2.log : Return Tranalyzer2 `_log.txt` file contents.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_log()
        """
        name = self.log_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer to create a log file first")
        with open(name) as f:
            print(f.read())

    def print_report(self):
        """Print the contents of Tranalyzer2 `_log.txt` file.

        Raises
        ------
        RuntimeError
            If the log file could not be found.

        See Also
        --------
        T2.print_report : Alias for `T2.print_log()`.
        T2.log_file : Return the filename of the log file.
        T2.log : Return Tranalyzer2 `_log.txt` file contents.
        T2.report : Alias for `T2.log()`.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> t2.print_report()
        """
        self.print_log()

    # =========================================================================

    def follow_stream(
            self,
            flow: int,
            output_format: Union[int, str] = 2,
            direction: str = None,
            payload_format: Union[int, str] = 4,
            reassembly: bool = True,
            colors: bool = True
    ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]:
        """Return the reassembled payload of the flow with specified `index` and `direction`.

        Parameters
        ----------
        flow : int
            Index (`flowInd`) of the flow to analyse.
        output_format : int or str, default: 2
            The format to use for the output:

            - 0 or `'payload_only'`
            - 1 or `'info'`: Prefix each payload with packet/flow info
            - 2 or `'json'`
            - 3 or `'reconstruct'`

        direction : str, default: None
            The direction to follow (`'A'` or `'B'`).
            Use `None` to follow both directions.
        payload_format : int or str, default: 4
            The format to use for the payload:

            - 0 or 'ascii'
            - 1 or 'hexdump'
            - 2, 'raw' or 'binary'
            - 3 or 'base64'
            - 4 or 'bytearray'

        reassembly : bool, default: True
            Analyze TCP sequence numbers, reassemble and reorder TCP segments.
        colors : bool, default: True
            Output colors.

        Returns
        -------
        list of bytearray
            If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`.
        list of dict
            If `output_format` is 2.
        list of str
            If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`.
        str
            If none of the above applied.

        Raises
        ------
        RuntimeError

            - If the `_packets.txt` file could not be found.
            - If `output_format` or `payload_format` was out of bound.
            - If `direction` was not `'A'`, `'B'` or `None`.
            - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`.
            - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`.

        subprocess.CalledProcessError
            If the process exited with a non-zero exit code.

        See Also
        --------
        t2py.T2Utils.T2Utils.follow_stream : Call `tawk` `follow_stream()` function with the specified options.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], packet_mode=True)
        >>> t2.tranalyzer2.SPKTMD_PCNTH = 1
        >>> t2.tranalyzer2.apply_changes()
        >>> t2.build()
        >>> t2.run()
        >>> t2.follow_stream(1, direction='B')
        [...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\\r\\n')},....]
        """
        name = self.packet_file()
        if not name or not isfile(name):
            raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file first")
        return T2Utils.follow_stream(name, flow, output_format, direction, payload_format, reassembly, colors)

    # =========================================================================

    def to_pandas(
            self,
            infile: str = None,
            delimiter: str = None
    ) -> "pandas.core.frame.DataFrame":
        """Convert the flow file or `infile` to pandas `DataFrame`.

        Parameters
        ----------
        infile : str
            Path to a flow or packet file.
        delimiter : str, default: None
            Field delimiter used in the input file.

        Returns
        -------
        pd.DataFrame
            DataFrame holding the tabular data stored in `infile`.

        Raises
        ------
        RuntimeError
            If `infile` could not be found.

        Examples
        --------
        >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
        >>> t2.build()
        >>> t2.run()
        >>> df = t2.to_pandas()
        >>> type(df)
        <class 'pandas.core.frame.DataFrame'>
        """
        infile = infile if infile else self.flow_file()
        if not infile:
            raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create a flow file first')
        return T2Utils.to_pandas(infile, delimiter)

    # =========================================================================
    # Private functions
    # =========================================================================

    def _output_prefix(self) -> str:
        """Return the output prefix (derived from Tranalyzer2 input if not set).

        Returns
        -------
        str
            The output_prefix.
        """
        if self.output_prefix:
            if not isdir(self.output_prefix) or not self.pcap:
                return self.output_prefix
            else:
                filename = self.pcap.split('/')[-1]
                if '.' in filename:
                    return join(self.output_prefix, '.'.join(filename.split('.')[:-1]))
                else:
                    return join(self.output_prefix, filename)
        elif self.pcap:
            if '.' in self.pcap:
                return '.'.join(self.pcap.split('.')[:-1])
            else:
                return self.pcap
        else:
            return None

    def _add_dict_entry(self, plugin: str):
        if plugin not in self.__dict__:
            self.__dict__[plugin] = self._plugins[plugin]

    def _add_dict_entries(self, plugins: List[str]):
        for plugin in plugins:
            self._add_dict_entry(plugin)

    def _remove_dict_entry(self, plugin: str):
        if plugin in self.__dict__:
            del self.__dict__[plugin]

    def _remove_dict_entries(self, plugins: List[str]):
        for plugin in plugins:
            self._remove_dict_entry(plugin)

Class variables

var plugin_folder : str

Path to the plugin folder. If None, default to '$HOME/.tranalyzer/plugins'.

Instance variables

var default_plugin_folder : str

Path to the default plugin folder, i.e., $HOME/.tranalyzer/plugins.

Expand source code
@property
def default_plugin_folder(self) -> str:
    """Path to the default plugin folder, i.e., `$HOME/.tranalyzer/plugins`."""
    if not self._default_plugin_folder:
        self._default_plugin_folder = join(expanduser('~'), '.tranalyzer', 'plugins')
    return self._default_plugin_folder
var loading_list

Get or set the path to a plugin loading list.

In addition, the setter calls the T2.add_plugin() function for each plugin listed in the file.

Expand source code
@property
def loading_list(self):
    """Get or set the path to a plugin loading list.

    In addition, the setter calls the `T2.add_plugin()` function for each plugin listed in the file.
    """
    return self._loading_list
var plugins : Dict[str, T2Plugin]

Dictionary of t2py.T2Plugin objects whose key is the plugin name.

Expand source code
@property
def plugins(self) -> Dict[str, T2Plugin]:
    """Dictionary of `t2py.T2Plugin` objects whose key is the plugin name."""
    return self._plugins
var streaming

Allow tranalyzer2 to run in streaming mode (call T2.stream() to access flows one by one).

Expand source code
@property
def streaming(self):
    """Allow `tranalyzer2` to run in streaming mode (call `T2.stream()` to access flows one by one)."""
    return self._streaming
var t2_exec : str

Path to the Tranalyzer2 executable.

Expand source code
@property
def t2_exec(self) -> str:
    """Path to the Tranalyzer2 executable."""
    if self.plugin_folder:
        return join(self.plugin_folder, 'bin', 'tranalyzer')
    else:
        return join(self.default_plugin_folder, 'bin', 'tranalyzer')
var tranalyzer2T2Plugin

t2py.T2Plugin object representing Tranalyzer2 (core).

Expand source code
@property
def tranalyzer2(self) -> T2Plugin:
    """`t2py.T2Plugin` object representing Tranalyzer2 (core)."""
    if not self._tranalyzer2:
        self._tranalyzer2 = T2Plugin('tranalyzer2')
    return self._tranalyzer2

Methods

def add_output_format(self, extension: Union[str, List[str]])

Select and configure the sink plugin(s) required to generate the requested output format.

Parameters

extension : str or list of str

extension may be any of:

  • 'bin' -> binSink
  • 'bin.gz' -> binSink + GZ_COMPRESS = 1
  • 'csv' -> txtSink + SEP_CHR = ','
  • 'csv.gz' -> txtSink + SEP_CHR = ',' + GZ_COMPRESS = 1
  • 'json' -> jsonSink
  • 'json.gz' -> jsonSink + GZ_COMPRESS = 1
  • 'txt' -> txtSink
  • 'txt.gz' -> txtSink + GZ_COMPRESS = 1
  • 'netflow' -> netflowSink
  • 'mysql' -> mysqlSink
  • 'sqlite' -> sqliteSink
  • 'postgres' -> psqlSink
  • 'mongo' -> mongoSink
  • 'socket' -> socketSink
  • 'streaming' -> socketSink + CONTENT_TYPE = 2
  • 'pcap' -> pcapd

Raises

RuntimeError
If one of the specified extension is not supported.

Examples

>>> t2 = T2()
>>> t2.add_output_format(['json.gz', 'txt'])
Expand source code
def add_output_format(
        self,
        extension: Union[str, List[str]]
):
    """Select and configure the sink plugin(s) required to generate the requested output format.

    Parameters
    ----------
    extension : str or list of str
        `extension` may be any of:

        - `'bin'`       -> binSink
        - `'bin.gz'`    -> binSink + `GZ_COMPRESS = 1`
        - `'csv'`       -> txtSink + `SEP_CHR = ','`
        - `'csv.gz'`    -> txtSink + `SEP_CHR = ','` + `GZ_COMPRESS = 1`
        - `'json'`      -> jsonSink
        - `'json.gz'`   -> jsonSink + `GZ_COMPRESS = 1`
        - `'txt'`       -> txtSink
        - `'txt.gz'`    -> txtSink + `GZ_COMPRESS = 1`
        - `'netflow'`   -> netflowSink
        - `'mysql'`     -> mysqlSink
        - `'sqlite'`    -> sqliteSink
        - `'postgres'`  -> psqlSink
        - `'mongo'`     -> mongoSink
        - `'socket'`    -> socketSink
        - `'streaming'` -> socketSink + `CONTENT_TYPE = 2`
        - `'pcap'`      -> pcapd

    Raises
    ------
    RuntimeError
        If one of the specified extension is not supported.

    Examples
    --------
    >>> t2 = T2()
    >>> t2.add_output_format(['json.gz', 'txt'])
    """
    if type(extension) == list:
        extensions = extension
    else:
        extensions = [extension]
    for ext in extensions:
        if ext == 'bin':
            self.add_plugin('binSink')
        elif ext == 'bin.gz':
            self.add_plugin('binSink')
            self.binSink.GZ_COMPRESS = 1
        elif ext == 'csv':
            self.add_plugin('txtSink')
            self.txtSink.SEP_CHR = ','
        elif ext == 'csv.gz':
            self.add_plugin('txtSink')
            self.txtSink.SEP_CHR = ','
            self.txtSink.GZ_COMPRESS = 1
        elif ext == 'txt':
            self.add_plugin('txtSink')
        elif ext == 'txt.gz':
            self.add_plugin('txtSink')
            self.txtSink.GZ_COMPRESS = 1
        elif ext == 'json':
            self.add_plugin('jsonSink')
        elif ext == 'json.gz':
            self.add_plugin('jsonSink')
            self.jsonSink.GZ_COMPRESS = 1
        elif ext == 'netflow':
            self.add_plugin('netflowSink')
        elif ext == 'mysql':
            self.add_plugin('mysqlSink')
        elif ext == 'sqlite':
            self.add_plugin('sqliteSink')
        elif ext == 'postgres':
            self.add_plugin('psqlSink')
        elif ext == 'mongo':
            self.add_plugin('mongoSink')
        elif ext == 'socket':
            self.add_plugin('socketSink')
        elif ext == 'pcap':
            self.add_plugin('pcapd')
        elif ext == 'streaming':
            self.streaming = True
        else:
            raise RuntimeError(f"output format '{ext}' not supported")
def add_plugin(self, plugin: str)

Add a plugin to the list of plugins.

Parameters

plugin : str
The name of the plugin to add.

See Also

T2.add_plugins()
Add a list of plugins to the list of plugins.
T2.remove_plugin()
Remove a plugin from the list of plugins.
T2.remove_plugins()
Remove a list of plugins from the list of plugins.

Examples

>>> t2 = T2()
>>> t2.add_plugin('arpDecode')
>>> type(t2.arpDecode)
<class 't2py.T2Plugin.T2Plugin'>
>>> t2.arpDecode.number
'179'
Expand source code
def add_plugin(
        self,
        plugin: str
):
    """Add a plugin to the list of plugins.

    Parameters
    ----------
    plugin : str
        The name of the plugin to add.

    See Also
    --------
    T2.add_plugins : Add a list of plugins to the list of plugins.
    T2.remove_plugin : Remove a plugin from the list of plugins.
    T2.remove_plugins : Remove a list of plugins from the list of plugins.

    Examples
    --------
    >>> t2 = T2()
    >>> t2.add_plugin('arpDecode')
    >>> type(t2.arpDecode)
    <class 't2py.T2Plugin.T2Plugin'>
    >>> t2.arpDecode.number
    '179'
    """
    if plugin not in self._plugins:
        self._plugins[plugin] = T2Plugin(plugin)
        self._add_dict_entry(plugin)
def add_plugins(self, plugins: List[str])

Add a list of plugins to the list of plugins.

Parameters

plugins : list of str
The list of plugins to add.

See Also

T2.add_plugin()
Add a plugin to the list of plugins.
T2.remove_plugin()
Remove a plugin from the list of plugins.
T2.remove_plugins()
Remove a list of plugins from the list of plugins.

Examples

>>> t2 = T2()
>>> t2.add_plugins(['arpDecode', 'basicStats'])
>>> type(t2.arpDecode)
<class 't2py.T2Plugin.T2Plugin'>
>>> t2.basicStats.description
'Basic statistics'
Expand source code
def add_plugins(
        self,
        plugins: List[str]
):
    """Add a list of plugins to the list of plugins.

    Parameters
    ----------
    plugins : list of str
        The list of plugins to add.

    See Also
    --------
    T2.add_plugin : Add a plugin to the list of plugins.
    T2.remove_plugin : Remove a plugin from the list of plugins.
    T2.remove_plugins : Remove a list of plugins from the list of plugins.

    Examples
    --------
    >>> t2 = T2()
    >>> t2.add_plugins(['arpDecode', 'basicStats'])
    >>> type(t2.arpDecode)
    <class 't2py.T2Plugin.T2Plugin'>
    >>> t2.basicStats.description
    'Basic statistics'
    """
    for plugin in plugins:
        self.add_plugin(plugin)
def apply_changes(self, verbose: bool = False)

Apply/persist the current configuration to the source files.

Parameters

verbose : bool, default: False
  • If True, print the output (stdout and stderr) of the command.
  • If False, do not print anything.

Raises

subprocess.CalledProcessError
If one of the processes exited with a non-zero exit code.

Examples

>>> t2 = T2(output_format='csv.gz')
>>> t2.apply_changes()
Expand source code
def apply_changes(
        self,
        verbose: bool = False
):
    """Apply/persist the current configuration to the source files.

    Parameters
    ----------
    verbose : bool, default: False

        - If `True`, print the output (`stdout` and `stderr`) of the command.
        - If `False`, do not print anything.

    Raises
    ------
    subprocess.CalledProcessError
        If one of the processes exited with a non-zero exit code.

    Examples
    --------
    >>> t2 = T2(output_format='csv.gz')
    >>> t2.apply_changes()
    """
    self.tranalyzer2.apply_changes(verbose=verbose)
    for plugin_name, plugin in self._plugins.items():
        plugin.apply_changes(verbose=verbose)
def build(self, plugin: Union[str, List[str]] = None, plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False)

Build Tranalyzer2 and the plugins.

Parameters

plugin : str or list of str, default: None
List of plugin names to build or path to a plugin loading list. If None, use T2.list_plugins().
plugin_folder : str, default: None
Path to the plugin folder. If None, use T2.plugin_folder.
force_rebuild : bool, default: False
Force the rebuild of the configuration files ($ t2build -r ...).
debug : bool, default: False
Build the plugin in debug mode.
verbose : bool, default: False
  • If True, print the output (stdout and stderr) of the command.
  • If False, do not print anything.

Raises

subprocess.CalledProcessError
If the process exited with a non-zero exit code.

See Also

T2.clean()
Remove files generated by T2.build().

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
>>> t2.build()
Expand source code
def build(
        self,
        plugin: Union[str, List[str]] = None,
        plugin_folder: str = None,
        force_rebuild: bool = False,
        debug: bool = False,
        verbose: bool = False
):
    """Build Tranalyzer2 and the plugins.

    Parameters
    ----------
    plugin : str or list of str, default: None
        List of plugin names to build or path to a plugin loading list.
        If `None`, use `T2.list_plugins()`.
    plugin_folder : str, default: None
        Path to the plugin folder.
        If `None`, use `T2.plugin_folder`.
    force_rebuild : bool, default: False
        Force the rebuild of the configuration files (`$ t2build -r ...`).
    debug : bool, default: False
        Build the plugin in debug mode.
    verbose : bool, default: False

        - If `True`, print the output (`stdout` and `stderr`) of the command.
        - If `False`, do not print anything.

    Raises
    ------
    subprocess.CalledProcessError
        If the process exited with a non-zero exit code.

    See Also
    --------
    T2.clean : Remove files generated by `T2.build()`.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
    >>> t2.build()
    """
    plugin_folder = plugin_folder if plugin_folder else self.plugin_folder
    to_build = plugin if plugin else list(self._plugins.keys())
    if not plugin:
        to_build.append('tranalyzer2')
    T2Utils.build(to_build, plugin_folder, force_rebuild, debug, verbose=verbose)
def clean(self, plugin: Union[str, List[str]] = None, verbose: bool = False)

Clean Tranalyzer2 and the plugins.

Parameters

plugin : str or list of str, default: None
List of plugin names to clean, or path to a plugin loading list. If None, use T2.list_plugins().
verbose : bool, default: False
  • If True, print the output (stdout and stderr) of the command.
  • If False, do not print anything.

Raises

subprocess.CalledProcessError
If the process exited with a non-zero exit code.

See Also

T2.build()
Build Tranalyzer2 and the plugins.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
>>> t2.build()
>>> t2.clean()
Expand source code
def clean(
        self,
        plugin: Union[str, List[str]] = None,
        verbose: bool = False
):
    """Clean Tranalyzer2 and the plugins.

    Parameters
    ----------
    plugin : str or list of str, default: None
        List of plugin names to clean, or path to a plugin loading list.
        If `None`, use `T2.list_plugins()`.
    verbose : bool, default: False

        - If `True`, print the output (`stdout` and `stderr`) of the command.
        - If `False`, do not print anything.

    Raises
    ------
    subprocess.CalledProcessError
        If the process exited with a non-zero exit code.

    See Also
    --------
    T2.build : Build Tranalyzer2 and the plugins.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
    >>> t2.build()
    >>> t2.clean()
    """
    to_clean = plugin if plugin else list(self._plugins.keys())
    if not plugin:
        to_clean.append('tranalyzer2')
    T2Utils.clean(to_clean, verbose=verbose)
def clear_plugins(self)

Clear the list of plugins.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats'])
>>> t2.clear_plugins()
>>> t2.list_plugins()
[]
Expand source code
def clear_plugins(self):
    """Clear the list of plugins.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats'])
    >>> t2.clear_plugins()
    >>> t2.list_plugins()
    []
    """
    self._remove_dict_entries(list(self._plugins.keys()))
    self._plugins.clear()
def create_plugin_list(self, plugins: List[str] = None, outfile: str = None, verbose: bool = False)

Create a loading list of plugins.

Parameters

plugins : list of str, default: None
List of plugin names. If None, use T2.list_plugins().
outfile : str, default: None
Name of the file to save. If None, default to 'plugins.load' in the plugin folder.
verbose : bool, default: False
  • If True, print the output (stdout and stderr) of the command.
  • If False, do not print anything.

Raises

subprocess.CalledProcessError
If the process exited with a non-zero exit code.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
>>> t2.create_plugin_list()
>>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt')
>>> t2.create_plugin_list(['basicFlow', 'txtSink'])
>>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt')
Expand source code
def create_plugin_list(
        self,
        plugins: List[str] = None,
        outfile: str = None,
        verbose: bool = False
):
    """Create a loading list of plugins.

    Parameters
    ----------
    plugins : list of str, default: None
        List of plugin names.
        If `None`, use `T2.list_plugins()`.
    outfile : str, default: None
        Name of the file to save.
        If `None`, default to `'plugins.load'` in the plugin folder.
    verbose : bool, default: False

        - If `True`, print the output (`stdout` and `stderr`) of the command.
        - If `False`, do not print anything.

    Raises
    ------
    subprocess.CalledProcessError
        If the process exited with a non-zero exit code.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
    >>> t2.create_plugin_list()
    >>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt')
    >>> t2.create_plugin_list(['basicFlow', 'txtSink'])
    >>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt')
    """
    if not outfile:
        if self.plugin_folder:
            outfile = join(self.plugin_folder, 'plugins.load')
        else:
            outfile = join(self.default_plugin_folder, 'plugins.load')
    if not plugins:
        plugins = list(self._plugins.keys())
    T2Utils.create_plugin_list(plugins, outfile, verbose=verbose)
def discard_changes(self)

Discard all non-committed changes made in this session.

Examples

>>> t2 = T2(plugins=['arpDecode', 'txtSink'])
>>> t2.arpDecode.MAX_IP = 5
>>> t2.discard_changes()
Expand source code
def discard_changes(self):
    """Discard all non-committed changes made in this session.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'txtSink'])
    >>> t2.arpDecode.MAX_IP = 5
    >>> t2.discard_changes()
    """
    for plugin_name, plugin in self._plugins.items():
        plugin.discard_changes()
def flow_file(self) ‑> str

Return the filename of the JSON (preferred) or TXT flow file.

Returns

str
The filename of the flow file.

See Also

T2.flows()
Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.
T2.print_flows()
Print the contents of Tranalyzer2 _flows.json (preferred) or _flows.txt file.
T2.flow_file_json()
Return the filename of the JSON flow file.
T2.flows_json()
Return Tranalyzer2 _flows.json file contents as an array of JSON objets.
T2.print_flows_json()
Print the contents of Tranalyzer2 _flows.json file.
T2.flow_file_txt()
Return the filename of the TXT flow file.
T2.flows_txt()
Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.
T2.print_flows_txt()
Print the contents of Tranalyzer2 _flows.txt file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt'])
>>> t2.run()
>>> t2.flow_file()
'/tmp/file_flows.json'
Expand source code
def flow_file(self) -> str:
    """Return the filename of the JSON (preferred) or TXT flow file.

    Returns
    -------
    str
        The filename of the flow file.

    See Also
    --------
    T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.
    T2.flow_file_json : Return the filename of the JSON flow file.
    T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
    T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
    T2.flow_file_txt : Return the filename of the TXT flow file.
    T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt'])
    >>> t2.run()
    >>> t2.flow_file()
    '/tmp/file_flows.json'
    """
    if 'jsonSink' in self._plugins:
        return self.flow_file_json()
    elif 'txtSink' in self._plugins:
        return self.flow_file_txt()
    else:
        return None
def flow_file_json(self) ‑> str

Return the filename of the JSON flow file.

Returns

str
The filename of the JSON flow file.

See Also

T2.flows_json()
Return Tranalyzer2 _flows.json file contents as an array of JSON objets.
T2.print_flows_json()
Print the contents of Tranalyzer2 _flows.json file.
T2.flow_file_txt()
Return the filename of the TXT flow file.
T2.flows_txt()
Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.
T2.print_flows_txt()
Print the contents of Tranalyzer2 _flows.txt file.
T2.flow_file()
Return the filename of the JSON (preferred) or TXT flow file.
T2.flows()
Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.
T2.print_flows()
Print the contents of Tranalyzer2 _flows.json (preferred) or _flows.txt file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', output_format='json')
>>> t2.run()
>>> t2.flow_file_json()
'/tmp/file_flows.json'
Expand source code
def flow_file_json(self) -> str:
    """Return the filename of the JSON flow file.

    Returns
    -------
    str
        The filename of the JSON flow file.

    See Also
    --------
    T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
    T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
    T2.flow_file_txt : Return the filename of the TXT flow file.
    T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.
    T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
    T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', output_format='json')
    >>> t2.run()
    >>> t2.flow_file_json()
    '/tmp/file_flows.json'
    """
    output_prefix = self._output_prefix()
    return output_prefix + '_flows.json' if output_prefix else None
def flow_file_txt(self) ‑> str

Return the filename of the TXT flow file.

Returns

str
The filename of the TXT flow file.

See Also

T2.flows_txt()
Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.
T2.print_flows_txt()
Print the contents of Tranalyzer2 _flows.txt file.
T2.flow_file_json()
Return the filename of the JSON flow file.
T2.flows_json()
Return Tranalyzer2 _flows.json file contents as an array of JSON objets.
T2.print_flows_json()
Print the contents of Tranalyzer2 _flows.json file.
T2.flow_file()
Return the filename of the JSON (preferred) or TXT flow file.
T2.flows()
Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.
T2.print_flows()
Print the contents of Tranalyzer2 _flows.json (preferred) or _flows.txt file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv')
>>> t2.run()
>>> t2.flow_file_txt()
'/tmp/file_flows.txt'
Expand source code
def flow_file_txt(self) -> str:
    """Return the filename of the TXT flow file.

    Returns
    -------
    str
        The filename of the TXT flow file.

    See Also
    --------
    T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file.
    T2.flow_file_json : Return the filename of the JSON flow file.
    T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
    T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
    T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
    T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv')
    >>> t2.run()
    >>> t2.flow_file_txt()
    '/tmp/file_flows.txt'
    """
    output_prefix = self._output_prefix()
    return output_prefix + '_flows.txt' if output_prefix else None
def flows(self) ‑> List[Dict[str, Any]]

Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.

Returns

list of dict
The list of rows contained in the flow file. Each row is represented as a dict with the column names as key.

Raises

RuntimeError
If neither the _flows.json nor the _flows.txt file could be found.

See Also

T2.flow_file()
Return the filename of the JSON (preferred) or TXT flow file.
T2.flow_file_json()
Return the filename of the JSON flow file.
T2.flow_file_txt()
Return the filename of the TXT flow file.
T2.flows_json()
Return Tranalyzer2 _flows.json file contents as an array of JSON objets.
T2.flows_txt()
Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.
T2.print_flows()
Print the contents of Tranalyzer2 _flows.json (preferred) or _flows.txt file.
T2.print_flows_json()
Print the contents of Tranalyzer2 _flows.json file.
T2.print_flows_txt()
Print the contents of Tranalyzer2 _flows.json file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.flows()
[{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
Expand source code
def flows(self) -> List[Dict[str, Any]]:
    """Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.

    Returns
    -------
    list of dict
        The list of rows contained in the flow file.
        Each row is represented as a dict with the column names as key.

    Raises
    ------
    RuntimeError
        If neither the `_flows.json` nor the `_flows.txt` file could be found.

    See Also
    --------
    T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
    T2.flow_file_json : Return the filename of the JSON flow file.
    T2.flow_file_txt : Return the filename of the TXT flow file.
    T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
    T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
    T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.
    T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file.
    T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.json` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.flows()
    [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
    """
    if 'jsonSink' in self._plugins:
        return self.flows_json()
    elif 'txtSink' in self._plugins:
        return self.flows_txt()
    else:
        raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create a flow file first')
def flows_json(self) ‑> List[Dict[str, Any]]

Return Tranalyzer2 _flows.json file contents as an array of JSON objets.

Returns

list of dict
The list of rows contained in the JSON flow file. Each row is represented as a dict with the column names as key.

Raises

RuntimeError
If the _flows.json file could not be found.

See Also

T2.flow_file_json()
Return the filename of the JSON flow file.
T2.flows()
Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.
T2.flows_txt()
Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.
T2.flow_file()
Return the filename of the JSON (preferred) or TXT flow file.
T2.flow_file_txt()
Return the filename of the TXT flow file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
>>> t2.build()
>>> t2.run()
>>> t2.flows_json()
[{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
Expand source code
def flows_json(self) -> List[Dict[str, Any]]:
    """Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.

    Returns
    -------
    list of dict
        The list of rows contained in the JSON flow file.
        Each row is represented as a dict with the column names as key.

    Raises
    ------
    RuntimeError
        If the `_flows.json` file could not be found.

    See Also
    --------
    T2.flow_file_json : Return the filename of the JSON flow file.
    T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
    T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.
    T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
    T2.flow_file_txt : Return the filename of the TXT flow file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.flows_json()
    [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
    """
    name = self.flow_file_json()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file first')
    return T2Utils.to_json_array(name)
def flows_txt(self, delimiter: str = None) ‑> List[Dict[str, Any]]

Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.

Parameters

delimiter : str, default: None
The delimiter used to separate columns in the input file. If None, default to '\t' if the txtSink plugin is not loaded else to the value of txtSink.SEP_CHR.

Returns

list of dict
The list of rows contained in the TXT flow file. Each row is represented as a dict with the column names as key.

Raises

RuntimeError
If the _flows.txt file could not be found.

See Also

T2.flow_file_txt()
Return the filename of the TXT flow file.
T2.flows()
Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.
T2.flows_json()
Return Tranalyzer2 _flows.json file contents as an array of JSON objets.
T2.flow_file()
Return the filename of the JSON (preferred) or TXT flow file.
T2.flow_file_json()
Return the filename of the JSON flow file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.flows_txt()
[{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
Expand source code
def flows_txt(
        self,
        delimiter: str = None
) -> List[Dict[str, Any]]:
    """Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.

    Parameters
    ----------
    delimiter : str, default: None
        The delimiter used to separate columns in the input file.
        If `None`, default to '\\t' if the txtSink plugin is not loaded else to the value of `txtSink.SEP_CHR`.

    Returns
    -------
    list of dict
        The list of rows contained in the TXT flow file.
        Each row is represented as a dict with the column names as key.

    Raises
    ------
    RuntimeError
        If the `_flows.txt` file could not be found.

    See Also
    --------
    T2.flow_file_txt : Return the filename of the TXT flow file.
    T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.
    T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.
    T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
    T2.flow_file_json : Return the filename of the JSON flow file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.flows_txt()
    [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
    """
    name = self.flow_file_txt()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
    if delimiter is None:
        if 'txtSink' not in self._plugins:
            delimiter = '\t'
        else:
            delimiter = self.txtSink.SEP_CHR.strip('"')
            if delimiter[0] == '\\':
                if delimiter[1] == 't':
                    delimiter = '\t'
                elif delimiter[1] == 'n':
                    delimiter = '\n'
                elif delimiter[1] == 'r':
                    delimiter = '\r'
                elif delimiter[1] == '\\':
                    delimiter = '\\'
    return T2Utils.to_json_array(name, delimiter)
def follow_stream(self, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True) ‑> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]

Return the reassembled payload of the flow with specified index and direction.

Parameters

flow : int
Index (flowInd) of the flow to analyse.
output_format : int or str, default: 2

The format to use for the output:

  • 0 or 'payload_only'
  • 1 or 'info': Prefix each payload with packet/flow info
  • 2 or 'json'
  • 3 or 'reconstruct'
direction : str, default: None
The direction to follow ('A' or 'B'). Use None to follow both directions.
payload_format : int or str, default: 4

The format to use for the payload:

  • 0 or 'ascii'
  • 1 or 'hexdump'
  • 2, 'raw' or 'binary'
  • 3 or 'base64'
  • 4 or 'bytearray'
reassembly : bool, default: True
Analyze TCP sequence numbers, reassemble and reorder TCP segments.
colors : bool, default: True
Output colors.

Returns

list of bytearray
If output_format is 0 or 2 and payload_format is 4 or 'bytearray'.
list of dict
If output_format is 2.
list of str
If output_format is 0 and payload_format is not 4 or 'bytearray'.
str
If none of the above applied.

Raises

RuntimeError
  • If the _packets.txt file could not be found.
  • If output_format or payload_format was out of bound.
  • If direction was not 'A', 'B' or None.
  • If output_format was 3 or 'reconstruct' and direction was not 'A' or 'B'.
  • If payload_format was 4 or 'bytearray' and output_format was 1 or 'info'.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.

See Also

T2Utils.follow_stream()
Call tawk follow_stream() function with the specified options.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], packet_mode=True)
>>> t2.tranalyzer2.SPKTMD_PCNTH = 1
>>> t2.tranalyzer2.apply_changes()
>>> t2.build()
>>> t2.run()
>>> t2.follow_stream(1, direction='B')
[...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\r\n')},....]
Expand source code
def follow_stream(
        self,
        flow: int,
        output_format: Union[int, str] = 2,
        direction: str = None,
        payload_format: Union[int, str] = 4,
        reassembly: bool = True,
        colors: bool = True
) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]:
    """Return the reassembled payload of the flow with specified `index` and `direction`.

    Parameters
    ----------
    flow : int
        Index (`flowInd`) of the flow to analyse.
    output_format : int or str, default: 2
        The format to use for the output:

        - 0 or `'payload_only'`
        - 1 or `'info'`: Prefix each payload with packet/flow info
        - 2 or `'json'`
        - 3 or `'reconstruct'`

    direction : str, default: None
        The direction to follow (`'A'` or `'B'`).
        Use `None` to follow both directions.
    payload_format : int or str, default: 4
        The format to use for the payload:

        - 0 or 'ascii'
        - 1 or 'hexdump'
        - 2, 'raw' or 'binary'
        - 3 or 'base64'
        - 4 or 'bytearray'

    reassembly : bool, default: True
        Analyze TCP sequence numbers, reassemble and reorder TCP segments.
    colors : bool, default: True
        Output colors.

    Returns
    -------
    list of bytearray
        If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`.
    list of dict
        If `output_format` is 2.
    list of str
        If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`.
    str
        If none of the above applied.

    Raises
    ------
    RuntimeError

        - If the `_packets.txt` file could not be found.
        - If `output_format` or `payload_format` was out of bound.
        - If `direction` was not `'A'`, `'B'` or `None`.
        - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`.
        - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`.

    subprocess.CalledProcessError
        If the process exited with a non-zero exit code.

    See Also
    --------
    t2py.T2Utils.T2Utils.follow_stream : Call `tawk` `follow_stream()` function with the specified options.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], packet_mode=True)
    >>> t2.tranalyzer2.SPKTMD_PCNTH = 1
    >>> t2.tranalyzer2.apply_changes()
    >>> t2.build()
    >>> t2.run()
    >>> t2.follow_stream(1, direction='B')
    [...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\\r\\n')},....]
    """
    name = self.packet_file()
    if not name or not isfile(name):
        raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file first")
    return T2Utils.follow_stream(name, flow, output_format, direction, payload_format, reassembly, colors)
def headers(self) ‑> str

Return Tranalyzer2 _headers.txt file contents.

Returns

str
The raw contents of the header file.

Raises

RuntimeError
If the _headers.txt file could not be found.

See Also

T2.headers_file()
Return the filename of the headers file.
T2.print_headers()
Print the contents of Tranalyzer2 _headers.txt file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.headers()
'# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...'
Expand source code
def headers(self) -> str:
    """Return Tranalyzer2 `_headers.txt` file contents.

    Returns
    -------
    str
        The raw contents of the header file.

    Raises
    ------
    RuntimeError
        If the `_headers.txt` file could not be found.

    See Also
    --------
    T2.headers_file : Return the filename of the headers file.
    T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.headers()
    '# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...'
    """
    name = self.headers_file()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file first')
    return self._load_file(name)
def headers_file(self) ‑> str

Return the filename of the headers file.

Returns

str
The filename of the headers file.

See Also

T2.headers()
Return Tranalyzer2 _headers.txt file contents.
T2.print_headers()
Print the contents of Tranalyzer2 _headers.txt file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.run()
>>> t2.headers_file()
'/tmp/file_headers.txt'
Expand source code
def headers_file(self) -> str:
    """Return the filename of the headers file.

    Returns
    -------
    str
        The filename of the headers file.

    See Also
    --------
    T2.headers : Return Tranalyzer2 `_headers.txt` file contents.
    T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.run()
    >>> t2.headers_file()
    '/tmp/file_headers.txt'
    """
    output_prefix = self._output_prefix()
    return output_prefix + '_headers.txt' if output_prefix else None
def list_plugins(self)

Alphabetically list the loaded plugins.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
>>> t2.list_plugins()
['arpDecode', 'basicStats', 'tcpFlags']
>>> t2.arpDecode.description
'Address Resolution Protocol (ARP)'
Expand source code
def list_plugins(self):
    """Alphabetically list the loaded plugins.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
    >>> t2.list_plugins()
    ['arpDecode', 'basicStats', 'tcpFlags']
    >>> t2.arpDecode.description
    'Address Resolution Protocol (ARP)'
    """
    return sorted(list(self._plugins.keys()))
def log(self) ‑> str

Return Tranalyzer2 _log.txt file contents.

Returns

str
The raw contents of the log file.

Raises

RuntimeError
If the _log.txt file could not be found.

See Also

T2.report()
Alias for T2.log()
T2.log_file()
Return the filename of the log file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.log()
'================================================================================...'
Expand source code
def log(self) -> str:
    """Return Tranalyzer2 `_log.txt` file contents.

    Returns
    -------
    str
        The raw contents of the log file.

    Raises
    ------
    RuntimeError
        If the `_log.txt` file could not be found.

    See Also
    --------
    T2.report : Alias for `T2.log()`
    T2.log_file : Return the filename of the log file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.log()
    '================================================================================...'
    """
    return self.report()
def log_file(self) ‑> str

Return the filename of the log file.

Returns

str
The filename of the log file.

See Also

T2.log()
Return Tranalyzer2 _log.txt file contents.
T2.print_log()
Print the contents of Tranalyzer2 _log.txt file.
T2.report()
Alias for T2.log().
T2.print_report()
Alias for T2.print_log().

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
>>> t2.run()
>>> t2.log_file()
'/tmp/file_log.txt'
Expand source code
def log_file(self) -> str:
    """Return the filename of the log file.

    Returns
    -------
    str
        The filename of the log file.

    See Also
    --------
    T2.log : Return Tranalyzer2 `_log.txt` file contents.
    T2.print_log : Print the contents of Tranalyzer2 `_log.txt` file.
    T2.report : Alias for `T2.log()`.
    T2.print_report : Alias for `T2.print_log()`.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
    >>> t2.run()
    >>> t2.log_file()
    '/tmp/file_log.txt'
    """
    output_prefix = self._output_prefix()
    return output_prefix + '_log.txt' if output_prefix else None
def packet_file(self) ‑> str

Return the filename of the packet file.

Returns

str
The filename of the packet file.

See Also

T2.packets()
Return Tranalyzer2 _packets.txt file contents as an array of JSON objets.
T2.print_packets()
Print the contents of Tranalyzer2 _packets.txt.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap')
>>> t2.run(packet_mode=True)
>>> t2.packet_file()
'/tmp/file_packets.txt'
Expand source code
def packet_file(self) -> str:
    """Return the filename of the packet file.

    Returns
    -------
    str
        The filename of the packet file.

    See Also
    --------
    T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.
    T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt`.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap')
    >>> t2.run(packet_mode=True)
    >>> t2.packet_file()
    '/tmp/file_packets.txt'
    """
    output_prefix = self._output_prefix()
    return output_prefix + '_packets.txt' if output_prefix else None
def packets(self) ‑> List[Dict[str, Any]]

Return Tranalyzer2 _packets.txt file contents as an array of JSON objets.

Returns

list of dict
The list of rows contained in the packet file. Each row is represented as a dict with the column names as key.

Raises

RuntimeError
If the _packets.txt file could not be found.

See Also

T2.packet_file()
Return the filename of the packet file.
T2.print_packets()
Print the contents of Tranalyzer2 _packets.txt file.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True)
>>> t2.build()
>>> t2.run()
>>> t2.packets()
[{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...]
Expand source code
def packets(self) -> List[Dict[str, Any]]:
    """Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.

    Returns
    -------
    list of dict
        The list of rows contained in the packet file.
        Each row is represented as a dict with the column names as key.

    Raises
    ------
    RuntimeError
        If the `_packets.txt` file could not be found.

    See Also
    --------
    T2.packet_file : Return the filename of the packet file.
    T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt` file.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True)
    >>> t2.build()
    >>> t2.run()
    >>> t2.packets()
    [{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...]
    """
    name = self.packet_file()
    if not name or not isfile(name):
        raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file first")
    return T2Utils.to_json_array(name)
def print_flows(self)

Print the contents of Tranalyzer2 _flows.json (preferred) or _flows.txt file.

Raises

RuntimeError
If the flow file could not be found.

See Also

T2.flow_file()
Return the filename of the JSON (preferred) or TXT flow file.
T2.flows()
Return Tranalyzer2 _flows.json (preferred) or _flows.txt file contents as an array of JSON objets.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.print_flows()
%dir    flowInd ...
A       1       ...
B       1       ...
...
Expand source code
def print_flows(self):
    """Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file.

    Raises
    ------
    RuntimeError
        If the flow file could not be found.

    See Also
    --------
    T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file.
    T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objets.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.print_flows()
    %dir    flowInd ...
    A       1       ...
    B       1       ...
    ...
    """
    name = self.flow_file()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
    with open(name) as f:
        print(f.read())
def print_flows_json(self)

Print the contents of Tranalyzer2 _flows.json file.

Raises

RuntimeError
If the JSON flow file could not be found.

See Also

T2.flow_file_json()
Return the filename of the JSON flow file.
T2.flows_json()
Return Tranalyzer2 _flows.json file contents as an array of JSON objets.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
>>> t2.build()
>>> t2.run()
>>> t2.print_flows_json()
{"dir":"A","flowInd":1,...}
{"dir":"B","flowInd":1,...}
...
Expand source code
def print_flows_json(self):
    """Print the contents of Tranalyzer2 `_flows.json` file.

    Raises
    ------
    RuntimeError
        If the JSON flow file could not be found.

    See Also
    --------
    T2.flow_file_json : Return the filename of the JSON flow file.
    T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objets.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.print_flows_json()
    {"dir":"A","flowInd":1,...}
    {"dir":"B","flowInd":1,...}
    ...
    """
    name = self.flow_file_json()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file first')
    with open(name) as f:
        print(f.read())
def print_flows_txt(self)

Print the contents of Tranalyzer2 _flows.txt file.

Raises

RuntimeError
If the flow file could not be found.

See Also

T2.flow_file_txt()
Return the filename of the TXT flow file.
T2.flows_txt()
Return Tranalyzer2 _flows.txt file contents as an array of JSON objets.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.print_flows_txt()
%dir    flowInd ...
A       1       ...
B       1       ...
...
Expand source code
def print_flows_txt(self):
    """Print the contents of Tranalyzer2 `_flows.txt` file.

    Raises
    ------
    RuntimeError
        If the flow file could not be found.

    See Also
    --------
    T2.flow_file_txt : Return the filename of the TXT flow file.
    T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objets.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.print_flows_txt()
    %dir    flowInd ...
    A       1       ...
    B       1       ...
    ...
    """
    name = self.flow_file_txt()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file first')
    with open(name) as f:
        print(f.read())
def print_headers(self)

Print the contents of Tranalyzer2 _headers.txt file.

Raises

RuntimeError
If the header file could not be found.

See Also

T2.headers_file()
Return the filename of the headers file.
T2.headers()
Return Tranalyzer2 _headers.txt file contents.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.print_headers()
# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)
# Tranalyzer 0.8.11 (Anteater), Tarantula.
...
Expand source code
def print_headers(self):
    """Print the contents of Tranalyzer2 `_headers.txt` file.

    Raises
    ------
    RuntimeError
        If the header file could not be found.

    See Also
    --------
    T2.headers_file : Return the filename of the headers file.
    T2.headers : Return Tranalyzer2 `_headers.txt` file contents.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.print_headers()
    # Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)
    # Tranalyzer 0.8.11 (Anteater), Tarantula.
    ...
    """
    name = self.headers_file()
    if not name or not isfile(name):
        raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file first')
    with open(name) as f:
        print(f.read())
def print_log(self)

Print the contents of Tranalyzer2 _log.txt file.

Raises

RuntimeError
If the log file could not be found.

See Also

T2.print_log()
Alias for T2.print_report().
T2.log_file()
Return the filename of the log file.
T2.log()
Return Tranalyzer2 _log.txt file contents.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.print_log()
Expand source code
def print_log(self):
    """Print the contents of Tranalyzer2 `_log.txt` file.

    Raises
    ------
    RuntimeError
        If the log file could not be found.

    See Also
    --------
    T2.print_log : Alias for `T2.print_report()`.
    T2.log_file : Return the filename of the log file.
    T2.log : Return Tranalyzer2 `_log.txt` file contents.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.print_log()
    """
    name = self.log_file()
    if not name or not isfile(name):
        raise RuntimeError("Run Tranalyzer to create a log file first")
    with open(name) as f:
        print(f.read())
def print_packets(self)

Print the contents of Tranalyzer2 _packets.txt file.

Raises

RuntimeError
If the packet file could not be found.

See Also

T2.packet_file()
Return the filename of the packet file.
T2.packets()
Return Tranalyzer2 _packets.txt file contents as an array of JSON objets.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run(packet_mode=True)
>>> t2.print_packets()
%pktNo  flowInd ...
1       1       ...
2       1       ...
...
Expand source code
def print_packets(self):
    """Print the contents of Tranalyzer2 `_packets.txt` file.

    Raises
    ------
    RuntimeError
        If the packet file could not be found.

    See Also
    --------
    T2.packet_file : Return the filename of the packet file.
    T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objets.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run(packet_mode=True)
    >>> t2.print_packets()
    %pktNo  flowInd ...
    1       1       ...
    2       1       ...
    ...
    """
    name = self.packet_file()
    if not name or not isfile(name):
        raise RuntimeError("Run Tranalyzer with 'packet_mode=True' plugin to create a packet file first")
    with open(name) as f:
        print(f.read())
def print_report(self)

Print the contents of Tranalyzer2 _log.txt file.

Raises

RuntimeError
If the log file could not be found.

See Also

T2.print_report()
Alias for T2.print_log().
T2.log_file()
Return the filename of the log file.
T2.log()
Return Tranalyzer2 _log.txt file contents.
T2.report()
Alias for T2.log().

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.print_report()
Expand source code
def print_report(self):
    """Print the contents of Tranalyzer2 `_log.txt` file.

    Raises
    ------
    RuntimeError
        If the log file could not be found.

    See Also
    --------
    T2.print_report : Alias for `T2.print_log()`.
    T2.log_file : Return the filename of the log file.
    T2.log : Return Tranalyzer2 `_log.txt` file contents.
    T2.report : Alias for `T2.log()`.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.print_report()
    """
    self.print_log()
def remove_plugin(self, plugin: str)

Remove a plugin from the list of plugins.

Parameters

plugin : str
The name of the plugin to remove.

See Also

T2.add_plugin()
Add a plugin to the list of plugins.
T2.add_plugins()
Add a list of plugins to the list of plugins.
T2.remove_plugins()
Remove a list of plugins from the list of plugins.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats'])
>>> t2.remove_plugin('arpDecode')
>>> t2.list_plugins()
['basicStats']
Expand source code
def remove_plugin(
        self,
        plugin: str
):
    """Remove a plugin from the list of plugins.

    Parameters
    ----------
    plugin : str
        The name of the plugin to remove.

    See Also
    --------
    T2.add_plugin : Add a plugin to the list of plugins.
    T2.add_plugins : Add a list of plugins to the list of plugins.
    T2.remove_plugins : Remove a list of plugins from the list of plugins.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats'])
    >>> t2.remove_plugin('arpDecode')
    >>> t2.list_plugins()
    ['basicStats']
    """
    self._remove_dict_entry(plugin)
    self._plugins.pop(plugin)
def remove_plugins(self, plugins: List[str])

Remove a list of plugins from the list of plugins.

Parameters

plugins : list of str
The list of plugins to remove.

See Also

T2.add_plugin()
Add a plugin to the list of plugins.
T2.add_plugins()
Add a list of plugins to the list of plugins.
T2.remove_plugin()
Remove a plugin from the list of plugins.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
>>> t2.remove_plugins(['arpDecode', 'basicStats'])
>>> t2.list_plugins()
['tcpFlags']
Expand source code
def remove_plugins(
        self,
        plugins: List[str]
):
    """Remove a list of plugins from the list of plugins.

    Parameters
    ----------
    plugins : list of str
        The list of plugins to remove.

    See Also
    --------
    T2.add_plugin : Add a plugin to the list of plugins.
    T2.add_plugins : Add a list of plugins to the list of plugins.
    T2.remove_plugin : Remove a plugin from the list of plugins.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
    >>> t2.remove_plugins(['arpDecode', 'basicStats'])
    >>> t2.list_plugins()
    ['tcpFlags']
    """
    for plugin in plugins:
        self.remove_plugin(plugin)
def report(self) ‑> str

Return Tranalyzer2 _log.txt file contents.

Returns

str
The raw contents of the log file.

Raises

RuntimeError
If the _log.txt file could not be found.

See Also

T2.log_file()
Return the filename of the log file.
T2.log()
Alias for T2.report()

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> t2.report()
'================================================================================...'
Expand source code
def report(self) -> str:
    """Return Tranalyzer2 `_log.txt` file contents.

    Returns
    -------
    str
        The raw contents of the log file.

    Raises
    ------
    RuntimeError
        If the `_log.txt` file could not be found.

    See Also
    --------
    T2.log_file : Return the filename of the log file.
    T2.log : Alias for `T2.report()`

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> t2.report()
    '================================================================================...'
    """
    name = self.log_file()
    if not name or not isfile(name):
        raise RuntimeError("Run Tranalyzer to create a log file first")
    return self._load_file(name)
def reset(self)

Reset all plugins' flags to their default value.

Examples

>>> t2 = T2(plugins=['arpDecode', 'txtSink'])
>>> t2.arpDecode.MAX_IP = 5
>>> t2.apply_config()
>>> t2.reset()
>>> t2.apply_config()
Expand source code
def reset(self):
    """Reset all plugins' flags to their default value.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'txtSink'])
    >>> t2.arpDecode.MAX_IP = 5
    >>> t2.apply_config()
    >>> t2.reset()
    >>> t2.apply_config()
    """
    for plugin_name, plugin in self._plugins.items():
        plugin.reset()
def run(self, pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, packet_mode: bool = False, plugins: List[str] = None, plugin_folder: str = None, loading_list: str = None, bpf: str = None, rebuild: bool = False, streaming: bool = False, timeout: int = None, verbose: bool = False)

Run Tranalyzer2.

Parameters

pcap : str, default: None
Path to a pcap file.
iface : str, default: None
Name of a network interface.
pcap_list : str or list of str, default: None
Path to a list of pcap files, e.g., '/tmp/myPcaps.txt'. Or list of path to pcap files, e.g., ['file1.pcap', 'file2.pcap'].
output_prefix : str, default: None
If None, automatically derived from input.
packet_mode : bool, default: False
Activate Tranalyzer2 packet mode.
plugin_folder : str, default: None
Path to the plugin folder.
plugins : list of str, default: None
The list of plugins to use. If None, use T2.list_plugins().
loading_list : str, default: None
Path to a plugin loading list. If None, create one using T2.list_plugins().
bpf : str, default: None
A BPF filter.
rebuild : bool, default, default: False
Apply the current changes and rebuild Tranalyzer2 and the plugins.
streaming : bool, default: False
If True, then T2.stream() can be used to yield one flow at a time.
timeout : int, default: None
Number of seconds after which to terminate the process. If None, run forever or until the end of file is reached.
verbose : bool, default: False
  • If True, print the output (stdout and stderr) of the command.
  • If False, do not print anything.

Raises

OSError
If specified interface iface does not exist locally.
RuntimeError
  • If none or more than one input (pcap, pcap_list, iface) is specified.
  • If socketSink plugin is not loaded or misconfigured when T2.streaming is True.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
subprocess.TimeoutExpired
If timeout was expired and its value reached.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
>>> t2.build()
>>> t2.run()
>>> t2.run(packet_mode=True)
Expand source code
def run(
        self,
        pcap: str = None,
        iface: str = None,
        pcap_list: Union[str, List[str]] = None,
        output_prefix: str = None,
        packet_mode: bool = False,
        plugins: List[str] = None,
        plugin_folder: str = None,
        loading_list: str = None,
        bpf: str = None,
        rebuild: bool = False,
        streaming: bool = False,
        timeout: int = None,
        verbose: bool = False
):
    """Run Tranalyzer2.

    Parameters
    ----------
    pcap : str, default: None
        Path to a pcap file.
    iface : str, default: None
        Name of a network interface.
    pcap_list : str or list of str, default: None
        Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`.
        Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`.
    output_prefix : str, default: None
        If `None`, automatically derived from input.
    packet_mode : bool, default: False
        Activate Tranalyzer2 packet mode.
    plugin_folder : str, default: None
        Path to the plugin folder.
    plugins : list of str, default: None
        The list of plugins to use.
        If `None`, use `T2.list_plugins()`.
    loading_list : str, default: None
        Path to a plugin loading list.
        If `None`, create one using `T2.list_plugins()`.
    bpf : str, default: None
        A BPF filter.
    rebuild : bool, default, default: False
        Apply the current changes and rebuild Tranalyzer2 and the plugins.
    streaming: bool, default: False
        If `True`, then `T2.stream()` can be used to yield one flow at a time.
    timeout : int, default: None
        Number of seconds after which to terminate the process.
        If `None`, run forever or until the end of file is reached.
    verbose : bool, default: False

        - If `True`, print the output (`stdout` and `stderr`) of the command.
        - If `False`, do not print anything.

    Raises
    ------
    OSError
        If specified interface `iface` does not exist locally.
    RuntimeError

        - If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified.
        - If socketSink plugin is not loaded or misconfigured when `T2.streaming` is `True`.

    subprocess.CalledProcessError
        If the process exited with a non-zero exit code.
    subprocess.TimeoutExpired
        If `timeout` was expired and its value reached.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt')
    >>> t2.build()
    >>> t2.run()
    >>> t2.run(packet_mode=True)
    """
    if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list):
        raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified')

    if (
        not pcap and not self.pcap and
        not iface and not self.iface and
        not pcap_list and not self.pcap_list
    ):
        raise RuntimeError('An input (pcap, pcap_list or iface) is required')

    if pcap:
        self.pcap = pcap
    elif iface:
        self.iface = iface
    elif pcap_list:
        self.pcap_list = pcap_list

    if output_prefix:
        self.output_prefix = output_prefix

    if plugin_folder:
        self.plugin_folder = plugin_folder

    if loading_list:
        self.loading_list = loading_list
    elif not plugins:
        plugins = list(self._plugins.keys())

    if packet_mode:
        self.packet_mode = packet_mode

    if bpf:
        self.bpf = bpf

    if streaming or self.streaming:
        if 'socketSink' not in self._plugins:
            raise RuntimeError('socketSink plugin is required for running in streaming mode')
        elif (self.socketSink.SOCKTYPE != 1 or
              self.socketSink.CONTENT_TYPE != 2 or
              self.socketSink.GZ_COMPRESS != 0):
            raise RuntimeError("Invalid configuration for socketSink in streaming mode: requires 'SOCKTYPE=1', 'CONTENT_TYPE=2' and 'GZ_COMPRESS=0'")
        else:
            self.streaming = True

    if rebuild:
        if not self.loading_list:
            self.apply_changes()
            self.build()
        else:
            self.tranalyzer2.apply_changes()
            self.tranalyzer2.build()
            with open(self.loading_list) as f:
                for plugin in f:
                    T2Utils.apply_config(plugin.strip())
            self.build(plugin=self.loading_list)

    def _run_tranalyzer(self):
        T2Utils.run_tranalyzer(
            t2_exec=self.t2_exec,
            pcap=self.pcap,
            iface=self.iface,
            pcap_list=self.pcap_list,
            output_prefix=self.output_prefix,
            log_file=True,
            packet_mode=self.packet_mode,
            plugin_folder=self.plugin_folder,
            bpf=self.bpf,
            loading_list=self.loading_list,
            plugins=plugins,
            timeout=timeout,
            verbose=verbose)

    if not self.streaming:
        _run_tranalyzer(self)
    else:
        self._setup_streaming()
        thread = threading.Thread(target=_run_tranalyzer, args=(self,))
        thread.daemon = True  # stop thread when run_tranalyzer returns
        thread.start()
def set_plugins(self, plugins: List[str] = None)

Overwrite the current list of plugins.

Parameters

plugins : list of str, default: None
The list of plugins to use. If None, empty the list of plugins.

See Also

T2.list_plugins()
Alphabetically list the currently active plugins.

Examples

>>> t2 = T2()
>>> t2.set_plugins(['arpDecode', 'basicStats'])
>>> t2.list_plugins()
['arpDecode', 'basicStats']
>>> type(t2.arpDecode)
<class 't2py.T2Plugin.T2Plugin'>
>>> t2.basicStats.description
'Basic statistics'
Expand source code
def set_plugins(
        self,
        plugins: List[str] = None
):
    """Overwrite the current list of plugins.

    Parameters
    ----------
    plugins : list of str, default: None
        The list of plugins to use.
        If `None`, empty the list of plugins.

    See Also
    --------
    T2.list_plugins : Alphabetically list the currently active plugins.

    Examples
    --------
    >>> t2 = T2()
    >>> t2.set_plugins(['arpDecode', 'basicStats'])
    >>> t2.list_plugins()
    ['arpDecode', 'basicStats']
    >>> type(t2.arpDecode)
    <class 't2py.T2Plugin.T2Plugin'>
    >>> t2.basicStats.description
    'Basic statistics'
    """
    self.clear_plugins()
    if plugins:
        self._plugins = {plugin: T2Plugin(plugin) for plugin in plugins}
        self._add_dict_entries(plugins)
def status(self)

Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers).

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz')
>>> t2.status()
Expand source code
def status(self):
    """Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers).

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz')
    >>> t2.status()
    """
    if self._tranalyzer2:
        t2 = self.tranalyzer2
        pending_changes = len(t2.changes)
        if pending_changes > 0:
            print(f'Tranalyzer2 [{pending_changes} change{"s" if pending_changes > 1 else ""} pending]:')
            for name, value in t2.changes.items():
                print(f'    {name} = {value}')
            print()
    print('Plugins: ')
    num_plugins = 0
    for plugin_name in sorted(self._plugins, key=lambda name: self._plugins[name].number):
        plugin = self._plugins[plugin_name]
        pending_changes = len(plugin.changes)
        print(f'    {num_plugins+1}: {plugin_name}', end='')
        if pending_changes == 0:
            if len(self._whitelisted_plugins) > 0 and plugin_name not in self._whitelisted_plugins:
                print(' [NOT IN LOADING LIST]')
            else:
                print()
        else:
            print(f' [{pending_changes} change{"s" if pending_changes > 1 else ""} pending]', end='')
            if len(self._whitelisted_plugins) > 0 and plugin_name not in self._whitelisted_plugins:
                print(' [NOT IN LOADING LIST]')
            else:
                print()
            for name, value in plugin.changes.items():
                print(f'            {name} = {value}')
        num_plugins += 1
    if num_plugins == 0:
        print('    None')
    print('\nTranalyzer options:')
    if self.pcap:
        print(f'    -r {self.pcap}')
    elif self.iface:
        print(f'    -i {self.iface}')
    elif self.pcap_list:
        if type(self.pcap_list) == list:
            print('    -R /tmp/pcap_list.txt')
        else:
            print(f'    -R {self.pcap_list}')
    if self.output_prefix:
        print(f'    -w {self.output_prefix}')
    if self.loading_list:
        print(f'    -b {self.loading_list}')
    if self.plugin_folder:
        print(f'    -p {self.plugin_folder}')
    if self.packet_mode:
        print('    -s')
    print('    -l')
    if self.bpf:
        print(f'    BPF filter: {self.bpf}')
def stream(self) ‑> Iterator[Dict[str, Any]]

Iterator returning flow by flow.

Raises

RuntimeError
  • If the function was called, without configuring streaming mode first.
  • If socketSink did not open connection.

Yields

dict
The next flow represented as a dict with the column names as key.

Examples

>>> t2 = T2(streaming=True, plugins=['basicFlow'])
>>> t2.run(pcap='file.pcap')
>>> for flow in t2.stream():
...     print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}")
Expand source code
def stream(self) -> Iterator[Dict[str, Any]]:
    """Iterator returning flow by flow.

    Raises
    ------
    RuntimeError

        - If the function was called, without configuring streaming mode first.
        - If socketSink did not open connection.

    Yields
    ------
    dict
        The next flow represented as a dict with the column names as key.

    Examples
    --------
    >>> t2 = T2(streaming=True, plugins=['basicFlow'])
    >>> t2.run(pcap='file.pcap')
    >>> for flow in t2.stream():
    ...     print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}")
    """
    if not self.streaming:
        raise RuntimeError("Calling 'stream' without configuring streaming mode first.")
    elif not self._conn:
        time.sleep(2)  # wait for tranalyzer to start and socketSink to connect

    if not self._conn:
        raise RuntimeError('socketSink did not open connection')

    buf = b''
    while True:
        buf += self._conn.recv(1024)
        if not buf:
            break
        lines = buf.split(b'\n')
        for line in lines[:-1]:
            yield json.loads(line)
        buf = lines[-1]
def to_pandas(self, infile: str = None, delimiter: str = None) ‑> pandas.core.frame.DataFrame

Convert the flow file or infile to pandas DataFrame.

Parameters

infile : str
Path to a flow or packet file.
delimiter : str, default: None
Field delimiter used in the input file.

Returns

pd.DataFrame
DataFrame holding the tabular data stored in infile.

Raises

RuntimeError
If infile could not be found.

Examples

>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
>>> t2.build()
>>> t2.run()
>>> df = t2.to_pandas()
>>> type(df)
<class 'pandas.core.frame.DataFrame'>
Expand source code
def to_pandas(
        self,
        infile: str = None,
        delimiter: str = None
) -> "pandas.core.frame.DataFrame":
    """Convert the flow file or `infile` to pandas `DataFrame`.

    Parameters
    ----------
    infile : str
        Path to a flow or packet file.
    delimiter : str, default: None
        Field delimiter used in the input file.

    Returns
    -------
    pd.DataFrame
        DataFrame holding the tabular data stored in `infile`.

    Raises
    ------
    RuntimeError
        If `infile` could not be found.

    Examples
    --------
    >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'])
    >>> t2.build()
    >>> t2.run()
    >>> df = t2.to_pandas()
    >>> type(df)
    <class 'pandas.core.frame.DataFrame'>
    """
    infile = infile if infile else self.flow_file()
    if not infile:
        raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create a flow file first')
    return T2Utils.to_pandas(infile, delimiter)
def unload(self, plugin: Union[str, List[str]] = None, plugin_folder: str = None, verbose: bool = False)

Remove (unload) plugin(s) from the plugin folder.

Parameters

plugin : str or list of str, default: None

plugin can be:

  • a single plugin, e.g., 'pluginName'
  • a list of plugins, e.g., ['pluginName1', 'pluginName2']
  • a plugin loading list, e.g., 'myPlugins.txt'
  • all to apply the operation to all available plugins
  • If None, use T2.list_plugins().
plugin_folder : str, default: None
The plugin folder from where the plugin must be unloaded. If None, use T2.plugin_folder.
verbose : bool, default: False
  • If True, print the output (stdout and stderr) of the command.
  • If False, do not print anything.

Raises

subprocess.CalledProcessError
If the process exited with a non-zero exit code.

Examples

>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
>>> t2.build()
>>> t2.unload()
Expand source code
def unload(
        self,
        plugin: Union[str, List[str]] = None,
        plugin_folder: str = None,
        verbose: bool = False
):
    """Remove (unload) plugin(s) from the plugin folder.

    Parameters
    ----------
    plugin : str or list of str, default: None
        `plugin` can be:

        - a single plugin, e.g., `'pluginName'`
        - a list of plugins, e.g., `['pluginName1', 'pluginName2']`
        - a plugin loading list, e.g., `'myPlugins.txt'`
        - `all` to apply the operation to all available plugins
        - If `None`, use `T2.list_plugins()`.

    plugin_folder : str, default: None
        The plugin folder from where the plugin must be unloaded.
        If `None`, use `T2.plugin_folder`.
    verbose : bool, default: False

        - If `True`, print the output (`stdout` and `stderr`) of the command.
        - If `False`, do not print anything.

    Raises
    ------
    subprocess.CalledProcessError
        If the process exited with a non-zero exit code.

    Examples
    --------
    >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags'])
    >>> t2.build()
    >>> t2.unload()
    """
    plugin_folder = plugin_folder if plugin_folder else self.plugin_folder
    to_unload = plugin if plugin else list(self._plugins.keys())
    T2Utils.unload(to_unload, plugin_folder, verbose=verbose)