Module t2py.T2Utils
Classes
class T2Utils
-
Simple wrapper around Tranalyzer2 scripts and utilities.
Expand source code
class T2Utils: """Simple wrapper around Tranalyzer2 scripts and utilities.""" T2HOME: str = normpath(join(dirname(realpath(__file__)), '..', '..')) """The path to Tranalyzer home folder.""" T2PLHOME: str = join(T2HOME, 'plugins') """The path to Tranalyzer plugins folder, i.e., `$T2HOME/plugins`.""" T2BUILD: str = join(T2HOME, 'autogen.sh') """The path to the `autogen.sh` script, i.e., `$T2HOME/autogen.sh`.""" T2CONF: str = join(T2HOME, 'scripts', 't2conf', 't2conf') """The path to the `t2conf` script, i.e., `$T2HOME/scripts/t2conf/t2conf`.""" T2FM: str = join(T2HOME, 'scripts', 't2fm', 't2fm') """The path to the `t2fm` script, i.e., `$T2HOME/scripts/t2fm/t2fm`.""" T2PLUGIN: str = join(T2HOME, 'scripts', 't2plugin') """The path to the `t2plugin` script, i.e., `$T2HOME/scripts/t2plugin`.""" TAWK: str = join(T2HOME, 'scripts', 'tawk', 'tawk') """The path to the `tawk` script, i.e., `$T2HOME/scripts/tawk/tawk`.""" _all_plugins: List[str] = None @staticmethod def t2_exec( debug: bool = False ) -> str: """Return the path to Tranalyzer2 release or debug executable. Parameters ---------- debug : bool, default: False - If `True`, returns the path to the debug executable. - If `False`, returns the path to the release executable. Returns ------- str The path to Tranalyzer2 executable. Examples -------- >>> T2Utils.t2_exec() /home/user/tranalyzer2-0.9.0/tranalyzer2/build/tranalyzer >>> T2Utils.t2_exec(debug=True) /home/user/tranalyzer2-0.9.0/tranalyzer2/debug/tranalyzer """ folder: str = 'debug' if debug else 'build' return join(T2Utils.T2HOME, 'tranalyzer2', folder, 'tranalyzer') @staticmethod def run_tranalyzer( pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, log_file: bool = False, save_monitoring: bool = False, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, bpf: str = None, t2_exec: str = None, timeout: int = None, verbose: bool = False ): """Run Tranalyzer2. Parameters ---------- pcap : str, default: None Path to a pcap file. iface : str, default: None Name of a network interface. pcap_list : str or list of str, default: None - Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`. - Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`. output_prefix : str, default: None If `None`, automatically derived from input. log_file : bool, default: False Save the final report in a `_log.txt` file. save_monitoring : bool, default: False Save the monitoring report in a `_monitoring.txt` file. packet_mode : bool, default: False Activate Tranalyzer2 packet mode. plugin_folder : str, default: None Path to the plugin folder. loading_list : str, default: None Path to a plugin loading list. If set, the `plugins` parameter is ignored. plugins : list of str, default: None The list of plugins to use. If `None`, load the plugins available in the plugin folder. bpf : str, default: None A BPF filter. t2_exec : str, default: None Path to the Tranalyzer2 executable. If `None`, use `T2Utils.t2_exec()` timeout : int, default: None Number of seconds after which to terminate the process. If `None`, run forever or until the end of file is reached. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ OSError If specified interface `iface` does not exist locally. RuntimeError If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified. subprocess.CalledProcessError If the process exited with a non-zero exit code. subprocess.TimeoutExpired If `timeout` was expired and its value reached. Examples -------- >>> T2Utils.run_tranalyzer(pcap='/tmp/file.pcap', output_prefix='/tmp/') """ if sum(bool(i) for i in (pcap, pcap_list, iface)) > 1: raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified') if not pcap and not iface and not pcap_list: raise RuntimeError('An input (pcap, pcap_list or iface) is required') if not t2_exec: t2_exec = T2Utils.t2_exec() if not t2_exec or not isfile(t2_exec): raise RuntimeError(f'Tranalyzer2 executable could not be found at {t2_exec}') cmd = [t2_exec] # Input arguments: -r pcap or -R pcap_list or -i iface if pcap: cmd.extend(['-r', pcap]) elif pcap_list: if isinstance(pcap_list, list): T2Utils.create_pcap_list(pcap_list, '/tmp/pcap_list.txt') pcap_list = '/tmp/pcap_list.txt' cmd.extend(['-R', pcap_list]) elif iface: if iface not in T2Utils.network_interfaces(): raise OSError(f'Interface {iface} does not exist locally') cmd.extend(['-i', iface]) # Output arguments: -w prefix / -l / -m / -s if output_prefix: cmd.extend(['-w', output_prefix]) if log_file: cmd.append('-l') if save_monitoring: cmd.append('-m') if packet_mode: cmd.append('-s') # Optional arguments if not loading_list and plugins: loading_list = '/tmp/plugins.load' T2Utils.create_plugin_list(plugins, outfile=loading_list, verbose=verbose) if loading_list: cmd.extend(['-b', loading_list]) if plugin_folder: cmd.extend(['-p', plugin_folder]) if bpf: cmd.append(bpf) subprocess.run(cmd, capture_output=not verbose, check=True, timeout=timeout) @staticmethod def list_config( plugin: str ) -> List[str]: """List the configuration flags available for `plugin`. Parameters ---------- plugin : str The name of a plugin Returns ------- list of str The list of configuration flags available for the plugin. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2conf pluginName -I`. Examples -------- >>> T2Utils.list_config('arpDecode') ['MAX_IP'] """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-I'] config = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE).stdout.strip() all_flags = config.split('\n') if config else [] flags = [f.strip() for f in all_flags] if len(flags) == 1 and flags[0] == f'No configuration flags found for {plugin}': flags = [] return flags @staticmethod def get_config( plugin: str, name: str, infile: str = None ) -> Any: """Get the value of a define from a configuration or header file. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. infile : str, default: None Specify from which file (absolute path) to get the value from, e.g., `'$T2PLHOME/pluginName/pluginName.cfg'`. Use the special value `'default'` to get the value from `default.config`. If `'source'` or `None`, get the value from the header file. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.get_config_from_source : Alias for `T2Utils.get_config(plugin, name, 'source')` T2Utils.get_default : Alias for `T2Utils.get_config(plugin, name, 'default')`. Notes ----- Equivalent to: `$ t2conf pluginName -G name [-g infile]`. Examples -------- >>> T2Utils.get_config('arpDecode', 'MAX_IP') 10 """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-G', name] if infile and infile != 'source': cmd.extend(['-g', infile]) config = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE).stdout val = config.split('=')[1].strip() if config else None if val: if val == 'no': val = 0 elif val == 'yes': val = 1 elif val.lstrip('-').isdigit(): val = int(val) elif val.lstrip('-').replace('.', '', 1).isdigit(): val = float(val) elif val.startswith('"'): val = val.strip('"') elif val.startswith("'"): val = val.strip("'") return val @staticmethod def get_default( plugin: str, name: str ) -> Any: """Get the default value of a define. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config : Get the value of a define from a configuration or header file. T2Utils.get_config_from_source : Get the value of a define from a header file. Notes ----- Equivalent to: `$ t2conf pluginName -G name -g default`. Alias for `T2Utils.get_config(plugin, name, infile='default')`. Examples -------- >>> T2Utils.get_default('arpDecode', 'MAX_IP') 10 """ return T2Utils.get_config(plugin, name, 'default') @staticmethod def get_config_from_source( plugin: str, name: str ) -> Any: """Get the value of a define from the header file, e.g., `pluginName.h`. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config : Get the value of a define from a configuration or header file. T2Utils.get_default : Get the default value of a define. Notes ----- Equivalent to: `$ t2conf pluginName -G name`. Alias for `T2Utils.get_config(plugin, name, infile='source')`. Examples -------- >>> T2Utils.get_config_from_source('arpDecode', 'MAX_IP') 10 """ return T2Utils.get_config(plugin, name, 'source') @staticmethod def set_config( plugin: str, flag_name: Union[str, Dict[str, Any]], flag_value: Any = None, outfile: str = None, verbose: bool = False ): """Set the value of a define in a configuration or header file. Parameters ---------- plugin : str The name of a plugin. flag_name : str or dict of str, Any The name of the configuration flag to set or a dictionary of key/value to set. flag_value : Any, default: None The new value for `flag_name`. Use `'default'` to reset the value to its default. Not required if `flag_name` is a dictionary of key/value to set. outfile : str, default: None Use `'source'` to set the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2conf pluginName -D name1=value1 [-D name2=value2 ...] [-g outfile]`. Examples -------- >>> T2Utils.set_config('arpDecode', 'MAX_IP', 25) """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') name_val = {flag_name: flag_value} if isinstance(flag_name, str) else flag_name for name, value in name_val.items(): current = T2Utils.get_config(plugin, name) # Make sure value is properly quoted if isinstance(current, str): if current[0] == '"' and current[-1] == '"': if value[0] != '"': value = '"' + value if value[-1] != '"': value += '"' cmd = [T2Utils.T2CONF, '-y', plugin, '-D', f'{name}={value}'] if outfile and outfile != 'source': cmd.extend(['-g', outfile]) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def set_default( plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False ): """Reset the value of a define to its default. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - `all` to apply the operation to all available plugins name : str or list of str, default: None The name of a configuration flag to reset. If `name` is set, only reset the value for the specified flag(s). outfile : str, default: None Path to a configuration file where the changes will be rest. Use `outfile='source'` to set the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If a plugin could not be found. NotImplementedError If trying to reset specific flag(s) with multiple plugins. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.set_config : Set the value of a define in a configuration or header file. T2Utils.reset_config : Reset all configuration flags from a plugin to their default values. Notes ----- Equivalent to: `$ t2conf pluginName -D name=default [-g outfile]`. Alias for `T2Utils.set_config(plugin, name, 'default', outfile)`. Examples -------- >>> T2Utils.set_default('arpDecode', 'MAX_IP') >>> T2Utils.set_default('basicStats', ['BS_VAR', 'BS_STDDEV']) >>> T2Utils.set_default(['arpDecode', 'basicStats']) """ if name: if isinstance(plugin, list): raise NotImplementedError('Cannot reset specific flag(s) with list of plugins') if plugin == 'all': raise NotImplementedError("Cannot reset specific flag(s) for 'all' plugins") if isinstance(name, list): for n in name: T2Utils.set_config(plugin, n, 'default', outfile=outfile, verbose=verbose) else: T2Utils.set_config(plugin, name, 'default', outfile=outfile, verbose=verbose) else: cmd = [T2Utils.T2CONF, '-y', '--reset'] accept_outfile = False if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) else: accept_outfile = True cmd.append(plugin) if outfile and outfile != 'source': if not accept_outfile: raise NotImplementedError("Cannot reset configuration in specific file " "with list of plugins or 'all' plugins") cmd.extend(['-g', outfile]) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def reset_config( plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False ): """Reset all configuration flags from a plugin to their default values. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` name : str, default: None The name of a configuration flag to reset. If `name` is set, only reset the value for the specified flag(s). outfile : str, default: None Path to a configuration file where the changes will be rest. Use `outfile='source'` to reset the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If a plugin could not be found. NotImplementedError If trying to reset specific flag(s) with multiple plugins. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.set_default : Set the value of a define in a configuration or header file. Notes ----- Equivalent to: `$ t2conf pluginName --reset [-g outfile]`. Alias for `T2Utils.set_default(plugin, None, outfile)`. Examples -------- >>> T2Utils.reset_config('arpDecode') >>> T2Utils.reset_config('arpDecode', 'MAX_IP') >>> T2Utils.reset_config(['arpDecode', 'txtSink']) >>> T2Utils.reset_config('arpDecode', 'MAX_IP', '/tmp/arpDecode.config') """ T2Utils.set_default(plugin, name, outfile=outfile, verbose=verbose) @staticmethod def generate_config( plugin: str, outfile: str = None, verbose: bool = False ): """Generate a configuration file for `plugin`. Parameters ---------- plugin : str The name of a plugin. outfile : str, default: None The filename where to save the configuration. If `None`, default to `'pluginName.config'` in the plugin home folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.apply_config : Apply a configuration file to the sources. Notes ----- Equivalent to: `$ t2conf pluginName -g [file]`. Examples -------- >>> T2Utils.generate_config('arpDecode') >>> T2Utils.generate_config('arpDecode', '/tmp/arpDecode.config') """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-g'] if outfile: cmd.append(outfile) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def apply_config( plugin: str, infile: str = None, verbose: bool = False ): """Apply a configuration file to the sources. Parameters ---------- plugin : str The name of a plugin. infile : str, default: None The name of the configuration file to apply. If `None`, default to `'pluginName.config'` in the plugin home folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.generate_config : Generate configuration files for plugins Notes ----- Equivalent to: `$ t2conf pluginName -C [file|auto]`. Examples -------- >>> T2Utils.apply_config('arpDecode') >>> T2Utils.apply_config('arpDecode', '/tmp/arpDecode.config') """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-C'] cmd.append(infile if infile else 'auto') subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def build( plugin: Union[str, List[str]], plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False ): """Build a plugin or Tranalyzer2. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins plugin_folder : str, default: None Path to the plugin folder. If `None`, default to `'$HOME/.tranalyzer/plugins'`. force_rebuild : bool, default: False Force the rebuild of the configuration files (`$ t2build -r ...`). debug : bool, default: False Build the plugin(s) in debug mode. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.clean : Remove files generated by `T2Utils.build()`. Notes ----- Equivalent to: - `$ t2build pluginName` - `$ t2build pluginName1 pluginName2` - `$ t2build -b myPlugins.txt` - `$ t2build -b -a` Examples -------- >>> T2Utils.build('arpDecode') >>> T2Utils.build(['arpDecode', 'txtSink']) >>> T2Utils.build('/tmp/myPlugins.load') >>> T2Utils.build('all') """ cmd = [T2Utils.T2BUILD, '-y'] if debug: cmd.append('-d') if force_rebuild: cmd.append('-r') if plugin_folder: cmd.extend(['-p', plugin_folder]) if 'tranalyzer2' in plugin: cmd.append('-i') if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def clean( plugin: Union[str, List[str]], verbose: bool = False ): """Remove files generated by `T2Utils.build()`. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.build : Build a plugin or Tranalyzer2. Notes ----- Equivalent to: - `$ t2build -c pluginName` - `$ t2build -c pluginName1 pluginName2` - `$ t2build -c -b myPlugins.txt` - `$ t2build -c -a` Examples -------- >>> T2Utils.clean('arpDecode') >>> T2Utils.clean(['arpDecode', 'txtSink']) >>> T2Utils.clean('/tmp/myPlugins.load') >>> T2Utils.clean('all') """ cmd = [T2Utils.T2BUILD, '-y', '-c'] if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def unload( plugin: Union[str, List[str]], plugin_folder: str = None, verbose: bool = False ): """Remove (unload) a plugin from the plugin folder. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins plugin_folder : str, default: None The plugin folder from where the plugin must be unloaded. If `None`, use the default plugin folder (`'$HOME/.tranalyzer/plugins'`). verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: - `$ t2build -u pluginName` - `$ t2build -u pluginName1 pluginName2` - `$ t2build -u -b myPlugins.txt` - `$ t2build -u -a` Examples -------- >>> T2Utils.unload('arpDecode') >>> T2Utils.unload(['arpDecode', 'txtSink']) >>> T2Utils.unload('/tmp/myPlugins.load') >>> T2Utils.unload('all') """ cmd = [T2Utils.T2BUILD, '-y', '-u'] if plugin_folder: cmd.extend(['-p', plugin_folder]) if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def plugin_description( plugin: str ) -> str: """Return a short description of a plugin. Parameters ---------- plugin : str The name of a plugin. Returns ------- str The description of the plugin. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2plugin -l | awk '$1 == "plugin" { sub("^" $1 FS "+" $2 FS "+", ""); print }'`. Examples -------- >>> T2Utils.plugin_description('arpDecode') 'Address Resolution Protocol (ARP)' """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2PLUGIN, '-l'] out = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE).stdout for line in out.splitlines(): cols = line.split() if len(cols) > 2 and cols[0] == plugin: return ' '.join(cols[2:]) return None @staticmethod def plugin_number( plugin: str ) -> str: """Return the plugin number of a plugin. Parameters ---------- plugin : str The name of a plugin. Returns ------- str The plugin number. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2plugin -l | awk '$1 == "pluginName" { print $2; exit }'` Examples -------- >>> T2Utils.plugin_number('arpDecode') '179' """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2PLUGIN, '-l'] out = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE).stdout for line in out.splitlines(): cols = line.split() if len(cols) > 2 and cols[0] == plugin: return cols[1] return None @staticmethod def plugins( category: Union[str, List[str]] = None ) -> List[str]: """Alphabetically List all the available plugins. Parameters ---------- category : str or list of str, default: None `category` can be used to specify the types of plugins to list: - `'g'` : global - `'b'` : basic - `'l2'`: layer 2 - `'l4'`: layer 3/4 - `'l7'`: layer 7 - `'a'` : application - `'m'` : math - `'c'` : classifier - `'o'` : output - `None`: all Returns ------- list of str List of all available plugins. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Examples -------- >>> T2Utils.plugins() >>> T2Utils.plugins('g') ['protoStats'] >>> T2Utils.plugins(['g', 'l2']) ['arpDecode', 'cdpDecode', 'lldpDecode', 'protoStats', 'stpDecode', 'vtpDecode'] """ cmd = [T2Utils.T2PLUGIN, '-H', '-N'] if isinstance(category, list): for c in category: cmd.append(f'-l={c}') elif category: cmd.append(f'-l={category}') elif T2Utils._all_plugins: return T2Utils._all_plugins else: cmd.append('-l') out = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE).stdout sorted_list = sorted(out.splitlines()) if not category: T2Utils._all_plugins = sorted_list return sorted_list @staticmethod def list_plugins( infile: str = None ) -> List[str]: """List the active plugins in `infile`. Parameters ---------- infile : str, default: None Name of a plugin loading list. If `None`, default to `'plugins.load'` in the plugin folder. Returns ------- list of str List of active plugins in `infile`. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2conf -S [file]`. Examples -------- >>> T2Utils.list_plugins() ['arpDecode', 'txtSink'] >>> T2Utils.list_plugins('/tmp/myPlugins.load') ['arpDecode', 'txtSink'] """ cmd = [T2Utils.T2CONF, '-y', '-S'] if infile: cmd.append(infile) out = subprocess.run(cmd, text=True, check=True, stdout=subprocess.PIPE).stdout return sorted(out.splitlines()) @staticmethod def create_pcap_list( pcaps: List[str], outfile: str = None ): """Create a loading list of pcaps. Parameters ---------- pcaps : list of str List of pcap files. outfile : str, default: None Name of the file to save. If `None`, default to `/tmp/pcap_list.txt'`. Notes ----- Equivalent to: `$ t2caplist pcaps > outfile`. Examples -------- >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap']) >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'], '/tmp/myPcaps.txt') """ if not outfile: outfile = '/tmp/pcap_list.txt' with open(outfile, 'w', encoding='utf-8') as f: for pcap in pcaps: f.write(pcap + '\n') @staticmethod def create_plugin_list( plugins: List[str], outfile: str = None, verbose: bool = False ): """Create a loading list of plugins. Parameters ---------- plugins : list of str List of plugin names. outfile : str, default: None Name of the file to save. If `None`, default to `'plugins.load'` in the plugin folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code (if `plugins is not None`). Notes ----- Equivalent to: `$ t2conf -L plugin1 plugin2 ...`. Examples -------- >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink']) >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'], '/tmp/myPlugins.txt') """ if outfile: directory = dirname(outfile) if not isdir(directory): makedirs(directory) # simpler format, only active plugins with open(outfile, 'w', encoding='utf-8') as f: for plugin in plugins: f.write(plugin + '\n') else: cmd = [T2Utils.T2CONF, '-y', '-L'] cmd.extend(plugins) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def tawk( program: str = None, filename: str = None, options: List[str] = None ) -> str: """Call `tawk` with the `program`, `options` and `filename` specified. Parameters ---------- program : str, default: None A `tawk` program, e.g., `"$dstPort == 80 { print tuple5() }"`. filename : str, default: None Name of the file to analyze. options : list of str, default: None List of options to pass to `tawk`. Returns ------- str The result of the `tawk` command. Raises ------ RuntimeError If `filename` was specified, but the file does not exist. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ tawk [options] 'program' filename`. Examples -------- >>> result = T2Utils.tawk(None, None, ['-V', 'flowStat=0x1234']) >>> print(result) >>> T2Utils.tawk('host("1.2.3.4") { aggr(proto()) }', 'file.txt') """ if filename and not isfile(filename): raise RuntimeError(f"Input file '{filename}' does not exist") cmd = [T2Utils.TAWK] if program: cmd.append(program) if options: cmd.extend(options) if filename: cmd.append(filename) result = subprocess.run(cmd, capture_output=True, check=True) result = result.stdout.decode('utf8').strip() return result @staticmethod def follow_stream( filename: str, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]: """Call `tawk` `follow_stream()` function with the specified options. Return the reassembled payload of flow with index `flow` and direction `direction`. Parameters ---------- filename : str Name of the file to analyze. flow : int Index (`flowInd`) of the flow to analyze. output_format : int or str, default: 2 The format to use for the output: - 0 or `'payload_only'` - 1 or `'info'`: Prefix each payload with packet/flow info - 2 or `'json'` - 3 or `'reconstruct'` direction : str, default: None The direction to follow (`'A'` or `'B'`). Use `None` to follow both directions. payload_format : int or str, default: 4 The format to use for the payload: - 0 or 'ascii' - 1 or 'hexdump' - 2, 'raw' or 'binary' - 3 or 'base64' - 4 or 'bytearray' reassembly : bool, default: True Analyze TCP sequence numbers, reassemble and reorder TCP segments. colors : bool, default: True Output colors. Returns ------- list of bytearray If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`. list of dict If `output_format` is 2. list of str If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`. str If none of the above applied. Raises ------ RuntimeError - If `filename` could not be found. - If `output_format` or `payload_format` was out of bound. - If `direction` was not `'A'`, `'B'` or `None`. - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`. - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.tawk : Call `tawk` with the `program`, `options` and `filename` specified. Notes ----- Equivalent to: `$ tawk 'follow_stream(flow, output_format, direction, payload_format, not reassembly, not colors)' filename`. Examples -------- >>> result = T2Utils.follow_stream(1, payload_format='hexdump', direction='B') >>> print(result) """ if not filename: raise RuntimeError("Input file != None is required") if direction: if direction not in ('A', 'B'): raise RuntimeError(f"Invalid value for 'direction' parameter: '{direction}'") direction = f'"{direction}"' if output_format in (0, 'payload_only'): output_format = 0 elif output_format in (1, 'info'): output_format = 1 elif output_format in (2, 'json'): output_format = 2 elif output_format in (3, 'reconstruct'): output_format = 3 else: raise RuntimeError(f"Invalid value for 'output_format' parameter: '{output_format}'") if output_format == 3 and direction is None: raise RuntimeError("Payload reconstruction (output format 3 or 'reconstruct') " "requires a direction ('A' or 'B').") if payload_format in (0, 'ascii'): payload = 0 elif payload_format in (1, 'hexdump'): payload = 1 elif payload_format in (2, 'raw', 'binary'): payload = 2 elif payload_format in (4, 'bytearray'): if output_format == 1: raise RuntimeError("'payload_format' 4 or 'bytearray' cannot be specified with " "'output_format' 1 or 'info'") payload = 2 colors = False elif payload_format in (3, 'base64'): payload = 3 else: raise RuntimeError(f"Invalid value for 'payload_format' parameter: '{payload_format}'") prog = ( 'follow_stream(' f'{flow},' f'{output_format},' f'{direction if direction else 0},' f'{payload},' f'{0 if reassembly else 1},' f'{0 if colors else 1}' ')' ) result = T2Utils.tawk(prog, filename) if output_format == 0: result = result.split('\n') if result else [] elif output_format == 2: result = result.split('\n') if result else [] result = [json.loads(s) if s else {} for s in result] if payload_format in (4, 'bytearray'): if output_format == 3: result = bytearray.fromhex(result) elif output_format == 0: result = [bytearray.fromhex(s) for s in result] elif output_format == 2: for packet in result: packet['payload'] = bytearray.fromhex(packet['payload']) return result @staticmethod def load_plugins( plugin: Union[str, List[str]] = None ): """Load plugins as `T2Plugin` objects. The plugins will be available as, e.g., `T2Utils.pluginName`. Parameters ---------- plugin : str or list of str, default: None Name of the plugin(s) to load. `plugin` can be: - a plugin name - a list of plugin names - `None` or 'all' (slow) Raises ------ NameError If a plugin could not be found. Examples -------- >>> T2Utils.load_plugins('arpDecode') >>> T2Utils.load_plugins(['arpDecode', 'txtSink']) >>> T2Utils.load_plugins() >>> T2Utils.arpDecode <t2py.T2Plugin.T2Plugin object at 0x16055a370> """ from .T2Plugin import T2Plugin all_plugins = T2Utils.valid_plugin_names() if isinstance(plugin, list): plugins = plugin elif not plugin or plugin == 'all': plugins = all_plugins else: plugins = [plugin] for p in plugins: if p not in all_plugins: raise NameError(f'Plugin {p} could not be found') setattr(T2Utils, p, T2Plugin(p)) @staticmethod def _tsv_to_list( filename: str, delimiter: str = '\t' ) -> List[Dict[str, Any]]: """Convert a tsv/csv file to a python list of JSON objects. Parameters ---------- filename : str The name of the file to load. delimiter : str, default: '\\t' The delimiter used to separate columns in the input file. Returns ------- list of dict A list of dictionary, whose entries are the column names and their associated values. """ data = [] with open(filename, encoding='utf-8') as f: reader = csv.DictReader(f, delimiter=delimiter) reader.fieldnames[0] = reader.fieldnames[0].lstrip('%').strip() for flow in reader: data.append(flow) return data @staticmethod def to_json_array( infile: str, delimiter: str = '\t' ) -> List[Dict[str, Any]]: """Convert a flow or packet file to an array of JSON objects. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: '\\t' Field delimiter used in the input file. Returns ------- list of dict The list of rows contained in `infile`. Each row is represented as a dict with the column names as key. Raises ------ json.decoder.JSONDecodeError If `infile` was a JSON file and the decoding process failed. Examples -------- >>> flows = T2Utils.to_json_array('/tmp/file_flows.txt') >>> flows [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ data = [] if not infile.endswith('.json'): data = T2Utils._tsv_to_list(infile, delimiter) else: with open(infile, encoding='utf-8') as f: for flow in f: if flow in ('[\n', ']\n'): continue data.append(json.loads(flow.strip(','))) return data @staticmethod def to_pandas( infile: str, delimiter: str = None ) -> "pandas.core.frame.DataFrame": """Convert a flow or packet file to pandas `DataFrame`. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: None Field delimiter used in the input file. Returns ------- pd.DataFrame DataFrame holding the tabular data stored in `infile`. Examples -------- >>> df = T2Utils.to_pandas('/tmp/file_flows.txt') >>> type(df) <class 'pandas.core.frame.DataFrame'> """ import pandas as pd if infile.endswith('.json'): return pd.DataFrame(T2Utils.to_json_array(infile)) return pd.read_table(infile, delimiter=delimiter) @staticmethod def to_pdf( pcap: str = None, flow_file: str = None, prefix: str = None, config: bool = True, reset_config: bool = True, open_pdf: bool = False, verbose: bool = False ): """Generate a PDF/LaTeX report from a PCAP or Tranalyzer flow file." Parameters ---------- pcap : str, default: None PCAP file to analyze. file : str, default: None Flow file to analyze. prefix : str, default: None Append 'prefix' to any output file produced. config : bool, default: True If `True`, configure and build Tranalyzer2 and the plugins as required. reset_config : bool, default: True If `True`, reset tranalyzer2 and the plugins configuration at the end. open_pdf : bool, default: False If `True`, automatically open the generated PDF. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ RuntimeError - If one of the required parameter (pcap or flow_file) was not specified. - If multiple inputs (pcap and flow_file) were specified. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2fm [-b] [-A] [-r pcap|-F file]`. Examples -------- >>> T2Utils.to_pdf(pcap='file.pcap') >>> T2Utils.to_pdf(flow_file='file_flows.txt') """ if not pcap and not flow_file: raise RuntimeError('One of (pcap, flow_file) must be specified.') if pcap and flow_file: raise RuntimeError('Only one of (pcap, flow_file) must be specified.') cmd = [T2Utils.T2FM, '-y'] # Input if pcap: cmd.extend(['-r', pcap]) elif flow_file: cmd.extend(['-F', flow_file]) # Output if prefix: cmd.extend(['-w', prefix]) # Options if config: cmd.append('-b') if reset_config: cmd.append('--reset') if open_pdf: cmd.append('-A') subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def network_interfaces() -> List[str]: """List the names of all available network interfaces. Returns ------- list of str The list of available network interfaces. Examples -------- >>> T2Utils.network_interfaces() ['lo0', 'eth0'] """ return [iface[1] for iface in socket.if_nameindex()] @staticmethod def valid_plugin_names() -> List[str]: """Return a list of valid plugin names. Returns ------- list of str The list of valid plugin names. Examples -------- >>> T2Utils.valid_plugin_names() ['arpDecode', ..., 'tranalyzer2', ...] """ valid_plugins = ['tranalyzer2'] valid_plugins.extend(T2Utils.plugins()) return sorted(valid_plugins)
Class variables
var T2BUILD : str
-
The path to the
autogen.sh
script, i.e.,$T2HOME/autogen.sh
. var T2CONF : str
-
The path to the
t2conf
script, i.e.,$T2HOME/scripts/t2conf/t2conf
. var T2FM : str
-
The path to the
t2fm
script, i.e.,$T2HOME/scripts/t2fm/t2fm
. var T2HOME : str
-
The path to Tranalyzer home folder.
var T2PLHOME : str
-
The path to Tranalyzer plugins folder, i.e.,
$T2HOME/plugins
. var T2PLUGIN : str
-
The path to the
t2plugin
script, i.e.,$T2HOME/scripts/t2plugin
. var TAWK : str
-
The path to the
tawk
script, i.e.,$T2HOME/scripts/tawk/tawk
.
Static methods
def apply_config(plugin: str, infile: str = None, verbose: bool = False)
-
Apply a configuration file to the sources.
Parameters
plugin
:str
- The name of a plugin.
infile
:str
, default: None
-
The name of the configuration file to apply.
If
None
, default to'pluginName.config'
in the plugin home folder. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.generate_config()
- Generate configuration files for plugins
Notes
Equivalent to:
$ t2conf pluginName -C [file|auto]
.Examples
>>> T2Utils.apply_config('arpDecode') >>> T2Utils.apply_config('arpDecode', '/tmp/arpDecode.config')
def build(plugin: Union[str, List[str]], plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False)
-
Build a plugin or Tranalyzer2.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a plugin loading list, e.g.,
'myPlugins.txt'
all
to apply the operation to all available plugins
- a single plugin, e.g.,
plugin_folder
:str
, default: None
-
Path to the plugin folder.
If
None
, default to'$HOME/.tranalyzer/plugins'
. force_rebuild
:bool
, default: False
- Force the rebuild of the configuration files (
$ t2build -r ...
). debug
:bool
, default: False
- Build the plugin(s) in debug mode.
verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.clean()
- Remove files generated by
T2Utils.build()
.
Notes
Equivalent to:
$ t2build pluginName
$ t2build pluginName1 pluginName2
$ t2build -b myPlugins.txt
$ t2build -b -a
Examples
>>> T2Utils.build('arpDecode') >>> T2Utils.build(['arpDecode', 'txtSink']) >>> T2Utils.build('/tmp/myPlugins.load') >>> T2Utils.build('all')
def clean(plugin: Union[str, List[str]], verbose: bool = False)
-
Remove files generated by
T2Utils.build()
.Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a plugin loading list, e.g.,
'myPlugins.txt'
all
to apply the operation to all available plugins
- a single plugin, e.g.,
verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.build()
- Build a plugin or Tranalyzer2.
Notes
Equivalent to:
$ t2build -c pluginName
$ t2build -c pluginName1 pluginName2
$ t2build -c -b myPlugins.txt
$ t2build -c -a
Examples
>>> T2Utils.clean('arpDecode') >>> T2Utils.clean(['arpDecode', 'txtSink']) >>> T2Utils.clean('/tmp/myPlugins.load') >>> T2Utils.clean('all')
def create_pcap_list(pcaps: List[str], outfile: str = None)
-
Create a loading list of pcaps.
Parameters
pcaps
:list
ofstr
- List of pcap files.
outfile
:str
, default: None
-
Name of the file to save.
If
None
, default to/tmp/pcap_list.txt'
.
Notes
Equivalent to:
$ t2caplist pcaps > outfile
.Examples
>>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap']) >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'], '/tmp/myPcaps.txt')
def create_plugin_list(plugins: List[str], outfile: str = None, verbose: bool = False)
-
Create a loading list of plugins.
Parameters
plugins
:list
ofstr
- List of plugin names.
outfile
:str
, default: None
-
Name of the file to save.
If
None
, default to'plugins.load'
in the plugin folder. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code (if
plugins is not None
).
Notes
Equivalent to:
$ t2conf -L plugin1 plugin2 ...
.Examples
>>> T2Utils.create_plugin_list(['arpDecode', 'txtSink']) >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'], '/tmp/myPlugins.txt')
def follow_stream(filename: str, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True) ‑> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]
-
Call
tawk
follow_stream()
function with the specified options.Return the reassembled payload of flow with index
flow
and directiondirection
.Parameters
filename
:str
- Name of the file to analyze.
flow
:int
- Index (
flowInd
) of the flow to analyze. output_format
:int
orstr
, default: 2
-
The format to use for the output:
- 0 or
'payload_only'
- 1 or
'info'
: Prefix each payload with packet/flow info - 2 or
'json'
- 3 or
'reconstruct'
- 0 or
direction
:str
, default: None
-
The direction to follow (
'A'
or'B'
).Use
None
to follow both directions. payload_format
:int
orstr
, default: 4
-
The format to use for the payload:
- 0 or 'ascii'
- 1 or 'hexdump'
- 2, 'raw' or 'binary'
- 3 or 'base64'
- 4 or 'bytearray'
reassembly
:bool
, default: True
- Analyze TCP sequence numbers, reassemble and reorder TCP segments.
colors
:bool
, default: True
- Output colors.
Returns
list
ofbytearray
- If
output_format
is 0 or 2 andpayload_format
is 4 or'bytearray'
. list
ofdict
- If
output_format
is 2. list
ofstr
- If
output_format
is 0 andpayload_format
is not 4 or'bytearray'
. str
- If none of the above applied.
Raises
RuntimeError
-
- If
filename
could not be found. - If
output_format
orpayload_format
was out of bound. - If
direction
was not'A'
,'B'
orNone
. - If
output_format
was 3 or'reconstruct'
anddirection
was not'A'
or'B'
. - If
payload_format
was 4 or'bytearray'
andoutput_format
was 1 or'info'
.
- If
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.tawk()
- Call
tawk
with theprogram
,options
andfilename
specified.
Notes
Equivalent to:
$ tawk 'follow_stream(flow, output_format, direction, payload_format, not reassembly, not colors)' filename
.Examples
>>> result = T2Utils.follow_stream(1, payload_format='hexdump', direction='B') >>> print(result)
def generate_config(plugin: str, outfile: str = None, verbose: bool = False)
-
Generate a configuration file for
plugin
.Parameters
plugin
:str
- The name of a plugin.
outfile
:str
, default: None
-
The filename where to save the configuration.
If
None
, default to'pluginName.config'
in the plugin home folder. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.apply_config()
- Apply a configuration file to the sources.
Notes
Equivalent to:
$ t2conf pluginName -g [file]
.Examples
>>> T2Utils.generate_config('arpDecode') >>> T2Utils.generate_config('arpDecode', '/tmp/arpDecode.config')
def get_config(plugin: str, name: str, infile: str = None) ‑> Any
-
Get the value of a define from a configuration or header file.
Parameters
plugin
:str
- The name of a plugin.
name
:str
- The name of a configuration flag.
infile
:str
, default: None
-
Specify from which file (absolute path) to get the value from, e.g.,
'$T2PLHOME/pluginName/pluginName.cfg'
.Use the special value
'default'
to get the value fromdefault.config
.If
'source'
orNone
, get the value from the header file.
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.get_config_from_source()
- Alias for
T2Utils.get_config(plugin, name, 'source')
T2Utils.get_default()
- Alias for
T2Utils.get_config(plugin, name, 'default')
.
Notes
Equivalent to:
$ t2conf pluginName -G name [-g infile]
.Examples
>>> T2Utils.get_config('arpDecode', 'MAX_IP') 10
def get_config_from_source(plugin: str, name: str) ‑> Any
-
Get the value of a define from the header file, e.g.,
pluginName.h
.Parameters
plugin
:str
- The name of a plugin.
name
:str
- The name of a configuration flag.
Raises
NameError
- If the plugin could not be found.
See Also
T2Utils.get_config()
- Get the value of a define from a configuration or header file.
T2Utils.get_default()
- Get the default value of a define.
Notes
Equivalent to:
$ t2conf pluginName -G name
.Alias for
T2Utils.get_config(plugin, name, infile='source')
.Examples
>>> T2Utils.get_config_from_source('arpDecode', 'MAX_IP') 10
def get_default(plugin: str, name: str) ‑> Any
-
Get the default value of a define.
Parameters
plugin
:str
- The name of a plugin.
name
:str
- The name of a configuration flag.
Raises
NameError
- If the plugin could not be found.
See Also
T2Utils.get_config()
- Get the value of a define from a configuration or header file.
T2Utils.get_config_from_source()
- Get the value of a define from a header file.
Notes
Equivalent to:
$ t2conf pluginName -G name -g default
.Alias for
T2Utils.get_config(plugin, name, infile='default')
.Examples
>>> T2Utils.get_default('arpDecode', 'MAX_IP') 10
def list_config(plugin: str) ‑> List[str]
-
List the configuration flags available for
plugin
.Parameters
plugin
:str
- The name of a plugin
Returns
list
ofstr
- The list of configuration flags available for the plugin.
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2conf pluginName -I
.Examples
>>> T2Utils.list_config('arpDecode') ['MAX_IP']
def list_plugins(infile: str = None) ‑> List[str]
-
List the active plugins in
infile
.Parameters
infile
:str
, default: None
-
Name of a plugin loading list.
If
None
, default to'plugins.load'
in the plugin folder.
Returns
list
ofstr
List of active plugins in
infile
.Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2conf -S [file]
.Examples
>>> T2Utils.list_plugins() ['arpDecode', 'txtSink'] >>> T2Utils.list_plugins('/tmp/myPlugins.load') ['arpDecode', 'txtSink']
def load_plugins(plugin: Union[str, List[str]] = None)
-
Load plugins as
T2Plugin
objects.The plugins will be available as, e.g.,
T2Utils.pluginName
.Parameters
plugin
:str
orlist
ofstr
, default: None
-
Name of the plugin(s) to load.
plugin
can be:- a plugin name
- a list of plugin names
None
or 'all' (slow)
Raises
NameError
- If a plugin could not be found.
Examples
>>> T2Utils.load_plugins('arpDecode') >>> T2Utils.load_plugins(['arpDecode', 'txtSink']) >>> T2Utils.load_plugins() >>> T2Utils.arpDecode <t2py.T2Plugin.T2Plugin object at 0x16055a370>
def network_interfaces() ‑> List[str]
-
List the names of all available network interfaces.
Returns
list
ofstr
- The list of available network interfaces.
Examples
>>> T2Utils.network_interfaces() ['lo0', 'eth0']
def plugin_description(plugin: str) ‑> str
-
Return a short description of a plugin.
Parameters
plugin
:str
- The name of a plugin.
Returns
str
- The description of the plugin.
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2plugin -l | awk '$1 == "plugin" { sub("^" $1 FS "+" $2 FS "+", ""); print }'
.Examples
>>> T2Utils.plugin_description('arpDecode') 'Address Resolution Protocol (ARP)'
def plugin_number(plugin: str) ‑> str
-
Return the plugin number of a plugin.
Parameters
plugin
:str
- The name of a plugin.
Returns
str
- The plugin number.
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2plugin -l | awk '$1 == "pluginName" { print $2; exit }'
Examples
>>> T2Utils.plugin_number('arpDecode') '179'
def plugins(category: Union[str, List[str]] = None) ‑> List[str]
-
Alphabetically List all the available plugins.
Parameters
category
:str
orlist
ofstr
, default: None
-
category
can be used to specify the types of plugins to list:'g'
: global'b'
: basic'l2'
: layer 2'l4'
: layer 3/4'l7'
: layer 7'a'
: application'm'
: math'c'
: classifier'o'
: outputNone
: all
Returns
list
ofstr
- List of all available plugins.
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Examples
>>> T2Utils.plugins() >>> T2Utils.plugins('g') ['protoStats'] >>> T2Utils.plugins(['g', 'l2']) ['arpDecode', 'cdpDecode', 'lldpDecode', 'protoStats', 'stpDecode', 'vtpDecode']
def reset_config(plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False)
-
Reset all configuration flags from a plugin to their default values.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a single plugin, e.g.,
name
:str
, default: None
-
The name of a configuration flag to reset.
If
name
is set, only reset the value for the specified flag(s). outfile
:str
, default: None
-
Path to a configuration file where the changes will be rest.
Use
outfile='source'
to reset the value in the header file. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If a plugin could not be found.
NotImplementedError
- If trying to reset specific flag(s) with multiple plugins.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.set_default()
- Set the value of a define in a configuration or header file.
Notes
Equivalent to:
$ t2conf pluginName --reset [-g outfile]
.Alias for
T2Utils.set_default()(plugin, None, outfile)
.Examples
>>> T2Utils.reset_config('arpDecode') >>> T2Utils.reset_config('arpDecode', 'MAX_IP') >>> T2Utils.reset_config(['arpDecode', 'txtSink']) >>> T2Utils.reset_config('arpDecode', 'MAX_IP', '/tmp/arpDecode.config')
def run_tranalyzer(pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, log_file: bool = False, save_monitoring: bool = False, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, bpf: str = None, t2_exec: str = None, timeout: int = None, verbose: bool = False)
-
Run Tranalyzer2.
Parameters
pcap
:str
, default: None
- Path to a pcap file.
iface
:str
, default: None
- Name of a network interface.
pcap_list
:str
orlist
ofstr
, default: None
-
- Path to a list of pcap files, e.g.,
'/tmp/myPcaps.txt'
. - Or list of path to pcap files, e.g.,
['file1.pcap', 'file2.pcap']
.
- Path to a list of pcap files, e.g.,
output_prefix
:str
, default: None
- If
None
, automatically derived from input. log_file
:bool
, default: False
- Save the final report in a
_log.txt
file. save_monitoring
:bool
, default: False
- Save the monitoring report in a
_monitoring.txt
file. packet_mode
:bool
, default: False
- Activate Tranalyzer2 packet mode.
plugin_folder
:str
, default: None
- Path to the plugin folder.
loading_list
:str
, default: None
-
Path to a plugin loading list.
If set, the
plugins
parameter is ignored. plugins
:list
ofstr
, default: None
-
The list of plugins to use.
If
None
, load the plugins available in the plugin folder. bpf
:str
, default: None
- A BPF filter.
t2_exec
:str
, default: None
-
Path to the Tranalyzer2 executable.
If
None
, useT2Utils.t2_exec()
timeout
:int
, default: None
-
Number of seconds after which to terminate the process.
If
None
, run forever or until the end of file is reached. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
OSError
- If specified interface
iface
does not exist locally. RuntimeError
- If none or more than one input (
pcap
,pcap_list
,iface
) is specified. subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
subprocess.TimeoutExpired
- If
timeout
was expired and its value reached.
Examples
>>> T2Utils.run_tranalyzer(pcap='/tmp/file.pcap', output_prefix='/tmp/')
def set_config(plugin: str, flag_name: Union[str, Dict[str, Any]], flag_value: Any = None, outfile: str = None, verbose: bool = False)
-
Set the value of a define in a configuration or header file.
Parameters
plugin
:str
- The name of a plugin.
flag_name
:str
ordict
ofstr, Any
- The name of the configuration flag to set or a dictionary of key/value to set.
flag_value
:Any
, default: None
-
The new value for
flag_name
.Use
'default'
to reset the value to its default.Not required if
flag_name
is a dictionary of key/value to set. outfile
:str
, default: None
- Use
'source'
to set the value in the header file. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2conf pluginName -D name1=value1 [-D name2=value2 ...] [-g outfile]
.Examples
>>> T2Utils.set_config('arpDecode', 'MAX_IP', 25)
def set_default(plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False)
-
Reset the value of a define to its default.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
all
to apply the operation to all available plugins
- a single plugin, e.g.,
name
:str
orlist
ofstr
, default: None
-
The name of a configuration flag to reset.
If
name
is set, only reset the value for the specified flag(s). outfile
:str
, default: None
-
Path to a configuration file where the changes will be rest.
Use
outfile='source'
to set the value in the header file. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If a plugin could not be found.
NotImplementedError
- If trying to reset specific flag(s) with multiple plugins.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.set_config()
- Set the value of a define in a configuration or header file.
T2Utils.reset_config()
- Reset all configuration flags from a plugin to their default values.
Notes
Equivalent to:
$ t2conf pluginName -D name=default [-g outfile]
.Alias for
T2Utils.set_config(plugin, name, 'default', outfile)
.Examples
>>> T2Utils.set_default('arpDecode', 'MAX_IP') >>> T2Utils.set_default('basicStats', ['BS_VAR', 'BS_STDDEV']) >>> T2Utils.set_default(['arpDecode', 'basicStats'])
def t2_exec(debug: bool = False) ‑> str
-
Return the path to Tranalyzer2 release or debug executable.
Parameters
debug
:bool
, default: False
-
- If
True
, returns the path to the debug executable.- If
False
, returns the path to the release executable.
- If
- If
Returns
str
- The path to Tranalyzer2 executable.
Examples
>>> T2Utils.t2_exec() /home/user/tranalyzer2-0.9.0/tranalyzer2/build/tranalyzer >>> T2Utils.t2_exec(debug=True) /home/user/tranalyzer2-0.9.0/tranalyzer2/debug/tranalyzer
def tawk(program: str = None, filename: str = None, options: List[str] = None) ‑> str
-
Call
tawk
with theprogram
,options
andfilename
specified.Parameters
program
:str
, default: None
- A
tawk
program, e.g.,"$dstPort == 80 { print tuple5() }"
. filename
:str
, default: None
- Name of the file to analyze.
options
:list
ofstr
, default: None
- List of options to pass to
tawk
.
Returns
str
- The result of the
tawk
command.
Raises
RuntimeError
- If
filename
was specified, but the file does not exist. subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ tawk [options] 'program' filename
.Examples
>>> result = T2Utils.tawk(None, None, ['-V', 'flowStat=0x1234']) >>> print(result) >>> T2Utils.tawk('host("1.2.3.4") { aggr(proto()) }', 'file.txt')
def to_json_array(infile: str, delimiter: str = '\t') ‑> List[Dict[str, Any]]
-
Convert a flow or packet file to an array of JSON objects.
Parameters
infile
:str
- Path to a flow or packet file.
delimiter
:str
, default: '\t'
- Field delimiter used in the input file.
Returns
list
ofdict
-
The list of rows contained in
infile
.Each row is represented as a dict with the column names as key.
Raises
json.decoder.JSONDecodeError
- If
infile
was a JSON file and the decoding process failed.
Examples
>>> flows = T2Utils.to_json_array('/tmp/file_flows.txt') >>> flows [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
def to_pandas(infile: str, delimiter: str = None)
-
Convert a flow or packet file to pandas
DataFrame
.Parameters
infile
:str
- Path to a flow or packet file.
delimiter
:str
, default: None
- Field delimiter used in the input file.
Returns
pd.DataFrame
- DataFrame holding the tabular data stored in
infile
.
Examples
>>> df = T2Utils.to_pandas('/tmp/file_flows.txt') >>> type(df) <class 'pandas.core.frame.DataFrame'>
def to_pdf(pcap: str = None, flow_file: str = None, prefix: str = None, config: bool = True, reset_config: bool = True, open_pdf: bool = False, verbose: bool = False)
-
Generate a PDF/LaTeX report from a PCAP or Tranalyzer flow file."
Parameters
pcap
:str
, default: None
- PCAP file to analyze.
file
:str
, default: None
- Flow file to analyze.
prefix
:str
, default: None
- Append 'prefix' to any output file produced.
config
:bool
, default: True
- If
True
, configure and build Tranalyzer2 and the plugins as required. reset_config
:bool
, default: True
- If
True
, reset tranalyzer2 and the plugins configuration at the end. open_pdf
:bool
, default: False
- If
True
, automatically open the generated PDF. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
RuntimeError
-
- If one of the required parameter (pcap or flow_file) was not specified.
- If multiple inputs (pcap and flow_file) were specified.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2fm [-b] [-A] [-r pcap|-F file]
.Examples
>>> T2Utils.to_pdf(pcap='file.pcap') >>> T2Utils.to_pdf(flow_file='file_flows.txt')
def unload(plugin: Union[str, List[str]], plugin_folder: str = None, verbose: bool = False)
-
Remove (unload) a plugin from the plugin folder.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a plugin loading list, e.g.,
'myPlugins.txt'
all
to apply the operation to all available plugins
- a single plugin, e.g.,
plugin_folder
:str
, default: None
-
The plugin folder from where the plugin must be unloaded.
If
None
, use the default plugin folder ('$HOME/.tranalyzer/plugins'
). verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to: -
$ t2build -u pluginName
-$ t2build -u pluginName1 pluginName2
-$ t2build -u -b myPlugins.txt
-$ t2build -u -a
Examples
>>> T2Utils.unload('arpDecode') >>> T2Utils.unload(['arpDecode', 'txtSink']) >>> T2Utils.unload('/tmp/myPlugins.load') >>> T2Utils.unload('all')
def valid_plugin_names() ‑> List[str]
-
Return a list of valid plugin names.
Returns
list
ofstr
- The list of valid plugin names.
Examples
>>> T2Utils.valid_plugin_names() ['arpDecode', ..., 'tranalyzer2', ...]