Module t2py.T2
Classes
class T2 (pcap: str = None,
iface: str = None,
pcap_list: str | List[str] = None,
output_prefix: str = None,
save_monitoring: bool = False,
packet_mode: bool = False,
plugin_folder: str = None,
loading_list: str = None,
plugins: List[str] = None,
output_format: str | List[str] = None,
bpf: str = None,
streaming: bool = False)-
Expand source code
class T2: """Manage several `t2py.T2Plugin` objects and run T2, convert/display flow file, ... Parameters ---------- pcap: str, default: None Path to a pcap file. iface: str, default: None Name of a network interface. pcap_list : str or list of str, default: None - Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`. - Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`. output_prefix: str, default: None If `None`, automatically derived from input. save_monitoring: bool, default: False Save the monitoring report in a `_monitoring.txt` file. packet_mode: bool, default: False Activate Tranalyzer2 packet mode. plugin_folder: str, default: None Path to the plugin folder. If `None`, default to `$HOME/.tranalyzer/plugins`. loading_list: str, default: None Path to a plugin loading list. plugins: list of str, default: None, A list of plugin names to load. output_format: str or list of str, default: None Select and configure the sink plugin(s) required to generate the requested output format. See also `T2.add_output_format()`. bpf: str, default: None A BPF filter. streaming: bool, default: False If `True`, then `T2.stream()` can be used to yield one flow at a time. Raises ------ OSError If specified interface `iface` does not exist locally. RuntimeError If more than one input (`pcap`, `pcap_list`, `iface`) is specified. """ plugin_folder: str = None """Path to the plugin folder. If `None`, default to `'$HOME/.tranalyzer/plugins'`.""" def __init__( self, pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, save_monitoring: bool = False, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, output_format: Union[str, List[str]] = None, bpf: str = None, streaming: bool = False ): if sum(bool(i) for i in (pcap, pcap_list, iface)) > 1: raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified') if iface and iface not in T2Utils.network_interfaces(): raise OSError(f'Interface {iface} does not exist locally') # Private variables self._whitelisted_plugins = [] self._default_plugin_folder = None self._tranalyzer2 = None self._plugins = {} self._conn = None self._streaming = False # Input self.pcap = pcap self.iface = iface self.pcap_list = pcap_list # Output self.output_prefix = output_prefix self.save_monitoring = save_monitoring self.packet_mode = packet_mode # Options self.bpf = bpf self.plugin_folder = plugin_folder self.loading_list = loading_list self.streaming = streaming if plugins: self.add_plugins(plugins) if output_format: self.add_output_format(output_format) def add_output_format( self, extension: Union[str, List[str]] ): """Select and configure the sink plugin(s) required to generate the requested output format. Parameters ---------- extension : str or list of str `extension` may be any of: - `'bin'` -> binSink - `'bin.gz'` -> binSink + `BFS_GZ_COMPRESS = 1` - `'csv'` -> txtSink + `SEP_CHR = ','` - `'csv.gz'` -> txtSink + `SEP_CHR = ','` + `TFS_GZ_COMPRESS = 1` - `'json'` -> jsonSink - `'json.gz'` -> jsonSink + `JSON_GZ_COMPRESS = 1` - `'txt'` -> txtSink - `'txt.gz'` -> txtSink + `TFS_GZ_COMPRESS = 1` - `'netflow'` -> netflowSink - `'mysql'` -> mysqlSink - `'sqlite'` -> sqliteSink - `'postgres'` -> psqlSink - `'mongo'` -> mongoSink - `'socket'` -> socketSink - `'streaming'` -> socketSink + `SKS_CONTENT_TYPE = 2` - `'pcap'` -> pcapd Raises ------ RuntimeError If one of the specified extension is not supported. Examples -------- >>> t2 = T2() >>> t2.add_output_format(['json.gz', 'txt']) """ if isinstance(extension, list): extensions = extension else: extensions = [extension] for ext in extensions: if ext == 'bin': self.add_plugin('binSink') elif ext == 'bin.gz': self.add_plugin('binSink') self.binSink.BFS_GZ_COMPRESS = 1 elif ext == 'csv': self.add_plugin('txtSink') self.txtSink.SEP_CHR = ',' elif ext == 'csv.gz': self.add_plugin('txtSink') self.txtSink.SEP_CHR = ',' self.txtSink.TFS_GZ_COMPRESS = 1 elif ext == 'txt': self.add_plugin('txtSink') elif ext == 'txt.gz': self.add_plugin('txtSink') self.txtSink.TFS_GZ_COMPRESS = 1 elif ext == 'json': self.add_plugin('jsonSink') elif ext == 'json.gz': self.add_plugin('jsonSink') self.jsonSink.JSON_GZ_COMPRESS = 1 elif ext == 'netflow': self.add_plugin('netflowSink') elif ext == 'mysql': self.add_plugin('mysqlSink') elif ext == 'sqlite': self.add_plugin('sqliteSink') elif ext == 'postgres': self.add_plugin('psqlSink') elif ext == 'mongo': self.add_plugin('mongoSink') elif ext == 'socket': self.add_plugin('socketSink') elif ext == 'pcap': self.add_plugin('pcapd') elif ext == 'streaming': self.streaming = True else: raise RuntimeError(f"output format '{ext}' not supported") @property def default_plugin_folder(self) -> str: """Path to the default plugin folder, i.e., `$HOME/.tranalyzer/plugins`.""" if not self._default_plugin_folder: self._default_plugin_folder = join(expanduser('~'), '.tranalyzer', 'plugins') return self._default_plugin_folder @property def tranalyzer2(self) -> T2Plugin: """`t2py.T2Plugin` object representing Tranalyzer2 (core).""" if not self._tranalyzer2: self._tranalyzer2 = T2Plugin('tranalyzer2') return self._tranalyzer2 @property def loading_list(self): """Get or set the path to a plugin loading list. In addition, the setter calls the `T2.add_plugin()` function for each plugin listed in the file. """ return self._loading_list @loading_list.setter def loading_list(self, loading_list: str): self._whitelisted_plugins.clear() self._loading_list = loading_list if loading_list: directory = dirname(loading_list) if not isdir(directory): makedirs(directory) with open(loading_list, encoding='utf-8') as f: for plugin in f: plugin = plugin.strip() if len(plugin) > 0 and not plugin.startswith('#'): self.add_plugin(plugin) self._whitelisted_plugins.append(plugin) @property def plugins(self) -> Dict[str, T2Plugin]: """Dictionary of `t2py.T2Plugin` objects whose key is the plugin name.""" return self._plugins @property def t2_exec(self) -> str: """Path to the Tranalyzer2 executable.""" if self.plugin_folder: return join(self.plugin_folder, 'bin', 'tranalyzer') return join(self.default_plugin_folder, 'bin', 'tranalyzer') @property def streaming(self): """Allow `tranalyzer2` to run in streaming mode. Call `T2.stream()` to access flows one by one. """ return self._streaming @streaming.setter def streaming(self, activate: bool): if activate: self.add_plugin('socketSink') self.socketSink.SKS_CONTENT_TYPE = 2 self._streaming = activate def headers_file(self) -> str: """Return the filename of the headers file. Returns ------- str The filename of the headers file. See Also -------- T2.headers : Return Tranalyzer2 `_headers.txt` file contents. T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.run() >>> t2.headers_file() '/tmp/file_headers.txt' """ output_prefix = self._output_prefix() return output_prefix + '_headers.txt' if output_prefix else None def flow_file_json(self) -> str: """Return the filename of the JSON flow file. Returns ------- str The filename of the JSON flow file. See Also -------- T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='json') >>> t2.run() >>> t2.flow_file_json() '/tmp/file_flows.json' """ output_prefix = self._output_prefix() return output_prefix + '_flows.json' if output_prefix else None def flow_file_txt(self) -> str: """Return the filename of the TXT flow file. Returns ------- str The filename of the TXT flow file. See Also -------- T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file. T2.flow_file_json : Return the filename of the JSON flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv') >>> t2.run() >>> t2.flow_file_txt() '/tmp/file_flows.txt' """ output_prefix = self._output_prefix() return output_prefix + '_flows.txt' if output_prefix else None def flow_file(self) -> str: """Return the filename of the JSON (preferred) or TXT flow file. Returns ------- str The filename of the flow file. See Also -------- T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. T2.flow_file_json : Return the filename of the JSON flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt']) >>> t2.run() >>> t2.flow_file() '/tmp/file_flows.json' """ if 'jsonSink' in self._plugins: return self.flow_file_json() if 'txtSink' in self._plugins: return self.flow_file_txt() return None def packet_file(self) -> str: """Return the filename of the packet file. Returns ------- str The filename of the packet file. See Also -------- T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objects. T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt`. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap') >>> t2.run(packet_mode=True) >>> t2.packet_file() '/tmp/file_packets.txt' """ output_prefix = self._output_prefix() return output_prefix + '_packets.txt' if output_prefix else None def log_file(self) -> str: """Return the filename of the log file. Returns ------- str The filename of the log file. See Also -------- T2.log : Return Tranalyzer2 `_log.txt` file contents. T2.print_log : Print the contents of Tranalyzer2 `_log.txt` file. T2.report : Alias for `T2.log()`. T2.print_report : Alias for `T2.print_log()`. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt') >>> t2.run() >>> t2.log_file() '/tmp/file_log.txt' """ output_prefix = self._output_prefix() return output_prefix + '_log.txt' if output_prefix else None def monitoring_file(self) -> str: """Return the filename of the monitoring file. Returns ------- str The filename of the monitoring file. See Also -------- T2.monitoring : Return Tranalyzer2 `_monitoring.txt` file contents. T2.print_monitoring : Print the contents of Tranalyzer2 `_monitoring.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt', monitoring_file=True) >>> t2.run() >>> t2.monitoring_file() '/tmp/file_monitoring.txt' """ output_prefix = self._output_prefix() return output_prefix + '_monitoring.txt' if output_prefix else None # ====================================================================== # def status(self): """Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers). Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz') >>> t2.status() """ bold = '\033[1m' blue = '\033[0;34m' orange = '\033[0;33m' reset = '\033[0m' def _pending_changes(): if pending_changes == 0: return '' return (' ' f'{blue}' f'[{pending_changes} change{"s" if pending_changes > 1 else ""} pending]' f'{reset}') def _loading_list_status(): if self.loading_list and plugin_name not in self._whitelisted_plugins: return f' {orange}[NOT IN LOADING LIST]{reset}' return '' if self._tranalyzer2: t2 = self.tranalyzer2 pending_changes = len(t2.changes) if pending_changes > 0: print(f'{bold}Tranalyzer2{reset}', end='') print(f'{blue}{_pending_changes()}{reset}:') for name, value in t2.changes.items(): print(f' {name} = {value}') print() print(bold, 'Plugins:', reset) num_plugins = 0 for plugin_name in sorted(self._plugins, key=lambda name: self._plugins[name].number): plugin = self._plugins[plugin_name] pending_changes = len(plugin.changes) print(f' {num_plugins + 1}: {plugin_name}' f'{_pending_changes()}' f'{_loading_list_status()}') if pending_changes > 0: for name, value in plugin.changes.items(): print(f' {name} = {value}') num_plugins += 1 if num_plugins == 0: print(' None') print(f'\n{bold}Tranalyzer options:{reset}') if self.pcap: print(f' -r {self.pcap}') elif self.iface: print(f' -i {self.iface}') elif self.pcap_list: if isinstance(self.pcap_list, list): print(' -R /tmp/pcap_list.txt') else: print(f' -R {self.pcap_list}') if self.output_prefix: print(f' -w {self.output_prefix}') if self.loading_list: print(f' -b {self.loading_list}') if self.plugin_folder: print(f' -p {self.plugin_folder}') if self.save_monitoring: print(' -m') if self.packet_mode: print(' -s') print(' -l') if self.bpf: print(f' BPF filter: {self.bpf}') # ====================================================================== # def clear_plugins(self): """Clear the list of plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats']) >>> t2.clear_plugins() >>> t2.list_plugins() [] """ self._remove_dict_entries(list(self._plugins.keys())) self._plugins.clear() def set_plugins( self, plugins: List[str] = None ): """Overwrite the current list of plugins. Parameters ---------- plugins : list of str, default: None The list of plugins to use. If `None`, empty the list of plugins. See Also -------- T2.list_plugins : Alphabetically list the currently active plugins. Examples -------- >>> t2 = T2() >>> t2.set_plugins(['arpDecode', 'basicStats']) >>> t2.list_plugins() ['arpDecode', 'basicStats'] >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.basicStats.description 'Basic statistics' """ self.clear_plugins() if plugins: self._plugins = {plugin: T2Plugin(plugin) for plugin in plugins} self._add_dict_entries(plugins) def add_plugin( self, plugin: str ): """Add a plugin to the list of plugins. Parameters ---------- plugin : str The name of the plugin to add. See Also -------- T2.add_plugins : Add a list of plugins to the list of plugins. T2.remove_plugin : Remove a plugin from the list of plugins. T2.remove_plugins : Remove a list of plugins from the list of plugins. Examples -------- >>> t2 = T2() >>> t2.add_plugin('arpDecode') >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.arpDecode.number '179' """ if plugin not in self._plugins: self._plugins[plugin] = T2Plugin(plugin) self._add_dict_entry(plugin) def add_plugins( self, plugins: List[str] ): """Add a list of plugins to the list of plugins. Parameters ---------- plugins : list of str The list of plugins to add. See Also -------- T2.add_plugin : Add a plugin to the list of plugins. T2.remove_plugin : Remove a plugin from the list of plugins. T2.remove_plugins : Remove a list of plugins from the list of plugins. Examples -------- >>> t2 = T2() >>> t2.add_plugins(['arpDecode', 'basicStats']) >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.basicStats.description 'Basic statistics' """ for plugin in plugins: self.add_plugin(plugin) def remove_plugin( self, plugin: str ): """Remove a plugin from the list of plugins. Parameters ---------- plugin : str The name of the plugin to remove. See Also -------- T2.add_plugin : Add a plugin to the list of plugins. T2.add_plugins : Add a list of plugins to the list of plugins. T2.remove_plugins : Remove a list of plugins from the list of plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats']) >>> t2.remove_plugin('arpDecode') >>> t2.list_plugins() ['basicStats'] """ self._remove_dict_entry(plugin) self._plugins.pop(plugin) def remove_plugins( self, plugins: List[str] ): """Remove a list of plugins from the list of plugins. Parameters ---------- plugins : list of str The list of plugins to remove. See Also -------- T2.add_plugin : Add a plugin to the list of plugins. T2.add_plugins : Add a list of plugins to the list of plugins. T2.remove_plugin : Remove a plugin from the list of plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.remove_plugins(['arpDecode', 'basicStats']) >>> t2.list_plugins() ['tcpFlags'] """ for plugin in plugins: self.remove_plugin(plugin) # ========================================================================= def list_plugins(self): """Alphabetically list the loaded plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.list_plugins() ['arpDecode', 'basicStats', 'tcpFlags'] >>> t2.arpDecode.description 'Address Resolution Protocol (ARP)' """ return sorted(self._plugins.keys()) def create_plugin_list( self, plugins: List[str] = None, outfile: str = None, verbose: bool = False ): """Create a loading list of plugins. Parameters ---------- plugins : list of str, default: None List of plugin names. If `None`, use `T2.list_plugins()`. outfile : str, default: None Name of the file to save. If `None`, default to `'plugins.load'` in the plugin folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.create_plugin_list() >>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt') >>> t2.create_plugin_list(['basicFlow', 'txtSink']) >>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt') """ if not outfile: if self.plugin_folder: outfile = join(self.plugin_folder, 'plugins.load') else: outfile = join(self.default_plugin_folder, 'plugins.load') if not plugins: plugins = list(self._plugins.keys()) T2Utils.create_plugin_list(plugins, outfile, verbose=verbose) self.loading_list = outfile # ========================================================================= def build( self, plugin: Union[str, List[str]] = None, plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False ): """Build Tranalyzer2 and the plugins. Parameters ---------- plugin : str or list of str, default: None List of plugin names to build or path to a plugin loading list. If `None`, use `T2.list_plugins()`. plugin_folder : str, default: None Path to the plugin folder. If `None`, use `T2.plugin_folder`. force_rebuild : bool, default: False Force the rebuild of the configuration files (`$ t2build -r ...`). debug : bool, default: False Build the plugin in debug mode. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2.clean : Remove files generated by `T2.build()`. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() """ plugin_folder = plugin_folder if plugin_folder else self.plugin_folder to_build = plugin if plugin else list(self._plugins.keys()) if not plugin: to_build.append('tranalyzer2') T2Utils.build(to_build, plugin_folder, force_rebuild, debug, verbose=verbose) def clean( self, plugin: Union[str, List[str]] = None, verbose: bool = False ): """Clean Tranalyzer2 and the plugins. Parameters ---------- plugin : str or list of str, default: None List of plugin names to clean, or path to a plugin loading list. If `None`, use `T2.list_plugins()`. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2.build : Build Tranalyzer2 and the plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() >>> t2.clean() """ to_clean = plugin if plugin else list(self._plugins.keys()) if not plugin: to_clean.append('tranalyzer2') T2Utils.clean(to_clean, verbose=verbose) def unload( self, plugin: Union[str, List[str]] = None, plugin_folder: str = None, verbose: bool = False ): """Remove (unload) plugin(s) from the plugin folder. Parameters ---------- plugin : str or list of str, default: None `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins - If `None`, use `T2.list_plugins()`. plugin_folder : str, default: None The plugin folder from where the plugin must be unloaded. If `None`, use `T2.plugin_folder`. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() >>> t2.unload() """ plugin_folder = plugin_folder if plugin_folder else self.plugin_folder to_unload = plugin if plugin else list(self._plugins.keys()) T2Utils.unload(to_unload, plugin_folder, verbose=verbose) def apply_changes( self, verbose: bool = False ): """Apply/persist the current configuration to the source files. Parameters ---------- verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If one of the processes exited with a non-zero exit code. Examples -------- >>> t2 = T2(output_format='csv.gz') >>> t2.apply_changes() """ self.tranalyzer2.apply_changes(verbose=verbose) for plugin in self._plugins.values(): plugin.apply_changes(verbose=verbose) def discard_changes(self): """Discard all non-committed changes made in this session. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'txtSink']) >>> t2.arpDecode.MAX_IP = 5 >>> t2.discard_changes() """ for plugin in self._plugins.values(): plugin.discard_changes() def reset(self): """Reset all plugins' flags to their default value. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'txtSink']) >>> t2.arpDecode.MAX_IP = 5 >>> t2.apply_config() >>> t2.reset() >>> t2.apply_config() """ for plugin in self._plugins.values(): plugin.reset() def _setup_streaming(self): assert 'socketSink' in self._plugins # start socket listener in separate thread def _socket_sink_listener(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((self.socketSink.SKS_SERVADD, self.socketSink.SKS_DPORT)) s.listen(1) self._conn, _ = s.accept() s.close() print('Setting up streaming server' f'{self.socketSink.SKS_SERVADD}:{self.socketSink.SKS_DPORT}') thread = threading.Thread(target=_socket_sink_listener, args=(self,)) thread.daemon = True # stop thread when _socket_sink_listener returns thread.start() def run( self, pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, save_monitoring: bool = False, packet_mode: bool = False, plugins: List[str] = None, plugin_folder: str = None, loading_list: str = None, bpf: str = None, rebuild: bool = False, streaming: bool = False, timeout: int = None, verbose: bool = False ): """Run Tranalyzer2. Parameters ---------- pcap : str, default: None Path to a pcap file. iface : str, default: None Name of a network interface. pcap_list : str or list of str, default: None Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'` or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`. output_prefix : str, default: None If `None`, automatically derived from input. save_monitoring: bool, default: False Save the monitoring report in a `_monitoring.txt` file. packet_mode : bool, default: False Activate Tranalyzer2 packet mode. plugin_folder : str, default: None Path to the plugin folder. plugins : list of str, default: None The list of plugins to use. If `None`, use `T2.list_plugins()`. loading_list : str, default: None Path to a plugin loading list. If `None`, create one using `T2.list_plugins()`. bpf : str, default: None A BPF filter. rebuild : bool, default, default: False Apply the current changes and rebuild Tranalyzer2 and the plugins. streaming: bool, default: False If `True`, then `T2.stream()` can be used to yield one flow at a time. timeout : int, default: None Number of seconds after which to terminate the process. If `None`, run forever or until the end of file is reached. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ OSError If specified interface `iface` does not exist locally. RuntimeError - If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified. - If socketSink plugin is not loaded or misconfigured when `T2.streaming` is `True`. subprocess.CalledProcessError If the process exited with a non-zero exit code. subprocess.TimeoutExpired If `timeout` was expired and its value reached. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt') >>> t2.build() >>> t2.run() >>> t2.run(packet_mode=True) """ inputs = (pcap, pcap_list, iface) if sum(bool(i) for i in inputs) > 1: raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified') inputs += (self.pcap, self.pcap_list, self.iface) if not any(bool(i) for i in inputs): raise RuntimeError('An input (pcap, pcap_list or iface) is required') if pcap: self.pcap = pcap self.pcap_list = None self.iface = None elif iface: self.pcap = None self.pcap_list = None self.iface = iface elif pcap_list: self.pcap = None self.pcap_list = pcap_list self.iface = None if output_prefix: self.output_prefix = output_prefix if plugin_folder: self.plugin_folder = plugin_folder if loading_list: self.loading_list = loading_list elif not plugins: plugins = list(self._plugins.keys()) if save_monitoring: self.save_monitoring = save_monitoring if packet_mode: self.packet_mode = packet_mode if bpf: self.bpf = bpf if streaming or self.streaming: if 'socketSink' not in self._plugins: raise RuntimeError('socketSink plugin is required for running in streaming mode') if (self.socketSink.SKS_SOCKTYPE != 1 or self.socketSink.SKS_CONTENT_TYPE != 2 or self.socketSink.SKS_GZ_COMPRESS != 0): raise RuntimeError("Invalid configuration for socketSink in streaming mode: " "requires 'SKS_SOCKTYPE=1', 'SKS_CONTENT_TYPE=2' and " "'SKS_GZ_COMPRESS=0'") self.streaming = True if rebuild: if not self.loading_list: self.apply_changes() self.build() else: self.tranalyzer2.apply_changes() self.tranalyzer2.build() with open(self.loading_list, encoding='utf-8') as f: for plugin in f: T2Utils.apply_config(plugin.strip()) self.build(plugin=self.loading_list) def _run_tranalyzer(self): T2Utils.run_tranalyzer( t2_exec=self.t2_exec, pcap=self.pcap, iface=self.iface, pcap_list=self.pcap_list, output_prefix=self.output_prefix, log_file=True, save_monitoring=self.save_monitoring, packet_mode=self.packet_mode, plugin_folder=self.plugin_folder, bpf=self.bpf, loading_list=self.loading_list, plugins=plugins, timeout=timeout, verbose=verbose) if not self.streaming: _run_tranalyzer(self) else: self._setup_streaming() thread = threading.Thread(target=_run_tranalyzer, args=(self,)) thread.daemon = True # stop thread when run_tranalyzer returns thread.start() # ========================================================================= # Access _headers.txt, _flows.txt, _packets.txt and _log.txt # ========================================================================= def stream(self) -> Iterator[Dict[str, Any]]: """Iterator returning flow by flow. Raises ------ RuntimeError - If the function was called, without configuring streaming mode first. - If socketSink did not open connection. Yields ------ dict The next flow represented as a dict with the column names as key. Examples -------- >>> t2 = T2(streaming=True, plugins=['basicFlow']) >>> t2.run(pcap='file.pcap') >>> for flow in t2.stream(): ... print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}") """ if not self.streaming: raise RuntimeError("Calling 'stream' without configuring streaming mode first.") if not self._conn: time.sleep(2) # wait for tranalyzer to start and socketSink to connect if not self._conn: raise RuntimeError('socketSink did not open connection') buf = b'' while True: buf += self._conn.recv(1024) if not buf: break lines = buf.split(b'\n') for line in lines[:-1]: yield json.loads(line) buf = lines[-1] def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if self._conn: self._conn.close() def _load_file(self, filename: str) -> str: """Load a headers/flows/packets/log file. Parameters ---------- filename : str The name of the file to load. """ if not filename or not isfile(filename): return None data = None with open(filename, encoding='utf-8') as f: data = f.read() return data def headers(self) -> str: """Return Tranalyzer2 `_headers.txt` file contents. Returns ------- str The raw contents of the header file. Raises ------ RuntimeError If the `_headers.txt` file could not be found. See Also -------- T2.headers_file : Return the filename of the headers file. T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.headers() '# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...' """ name = self.headers_file() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file') return self._load_file(name) def flows_json(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the JSON flow file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_flows.json` file could not be found. See Also -------- T2.flow_file_json : Return the filename of the JSON flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flow_file_txt : Return the filename of the TXT flow file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink']) >>> t2.build() >>> t2.run() >>> t2.flows_json() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ name = self.flow_file_json() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create ' 'a JSON flow file') return T2Utils.to_json_array(name) def flows_txt( self, delimiter: str = None ) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. Parameters ---------- delimiter : str, default: None The delimiter used to separate columns in the input file. If `None`, default to '\\t' if the txtSink plugin is not loaded else to the value of `txtSink.SEP_CHR`. Returns ------- list of dict The list of rows contained in the TXT flow file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_flows.txt` file could not be found. See Also -------- T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flow_file_json : Return the filename of the JSON flow file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.flows_txt() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ name = self.flow_file_txt() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file') if delimiter is None: if 'txtSink' not in self._plugins: delimiter = '\t' else: delimiter = self.txtSink.SEP_CHR.strip('"') if delimiter[0] == '\\': if delimiter[1] == 't': delimiter = '\t' elif delimiter[1] == 'n': delimiter = '\n' elif delimiter[1] == 'r': delimiter = '\r' elif delimiter[1] == '\\': delimiter = '\\' return T2Utils.to_json_array(name, delimiter) def flows(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the flow file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If neither the `_flows.json` nor the `_flows.txt` file could be found. See Also -------- T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flow_file_json : Return the filename of the JSON flow file. T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.json` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.flows() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ if 'jsonSink' in self._plugins: return self.flows_json() if 'txtSink' in self._plugins: return self.flows_txt() raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create ' 'a flow file') def packets(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the packet file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_packets.txt` file could not be found. See Also -------- T2.packet_file : Return the filename of the packet file. T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True) >>> t2.build() >>> t2.run() >>> t2.packets() [{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...] """ name = self.packet_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file") return T2Utils.to_json_array(name) def report(self) -> str: """Return Tranalyzer2 `_log.txt` file contents. Returns ------- str The raw contents of the log file. Raises ------ RuntimeError If the `_log.txt` file could not be found. See Also -------- T2.log_file : Return the filename of the log file. T2.log : Alias for `T2.report()` Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.report() '================================================================================...' """ name = self.log_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer to create a log file") return self._load_file(name) def log(self) -> str: """Return Tranalyzer2 `_log.txt` file contents. Returns ------- str The raw contents of the log file. Raises ------ RuntimeError If the `_log.txt` file could not be found. See Also -------- T2.report : Alias for `T2.log()` T2.log_file : Return the filename of the log file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.log() '================================================================================...' """ return self.report() def monitoring(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_monitoring.txt` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the monitoring file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_monitoring.txt` file could not be found. See Also -------- T2.monitoring_file : Return the filename of the monitoring file. T2.print_monitoring : Print the contents of Tranalyzer2 `_monitoring.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], monitoring_file=True) >>> t2.tranalyzer2.MACHINE_REPORT = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.monitoring() [{'repTyp': 'USR1MR_A', 'sensorID': 666, 'time': ...}, {'repTyp': 'USR1MR_A', ...}, ...] """ name = self.monitoring_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'save_monitoring=True' to create " "a monitoring file") return T2Utils.to_json_array(name) # ========================================================================= # Print _headers.txt, _flows.txt, _packets.txt and _log.txt files contents # ========================================================================= def print_headers(self): """Print the contents of Tranalyzer2 `_headers.txt` file. Raises ------ RuntimeError If the header file could not be found. See Also -------- T2.headers_file : Return the filename of the headers file. T2.headers : Return Tranalyzer2 `_headers.txt` file contents. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_headers() # Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST) # Tranalyzer 0.8.11 (Anteater), Tarantula. ... """ name = self.headers_file() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file') with open(name, encoding='utf-8') as f: print(f.read()) def print_flows(self): """Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. Raises ------ RuntimeError If the flow file could not be found. See Also -------- T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows() %dir flowInd ... A 1 ... B 1 ... ... """ name = self.flow_file() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file') with open(name, encoding='utf-8') as f: print(f.read()) def print_flows_json(self): """Print the contents of Tranalyzer2 `_flows.json` file. Raises ------ RuntimeError If the JSON flow file could not be found. See Also -------- T2.flow_file_json : Return the filename of the JSON flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows_json() {"dir":"A","flowInd":1,...} {"dir":"B","flowInd":1,...} ... """ name = self.flow_file_json() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file') with open(name, encoding='utf-8') as f: print(f.read()) def print_flows_txt(self): """Print the contents of Tranalyzer2 `_flows.txt` file. Raises ------ RuntimeError If the flow file could not be found. See Also -------- T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows_txt() %dir flowInd ... A 1 ... B 1 ... ... """ name = self.flow_file_txt() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file') with open(name, encoding='utf-8') as f: print(f.read()) def print_packets(self): """Print the contents of Tranalyzer2 `_packets.txt` file. Raises ------ RuntimeError If the packet file could not be found. See Also -------- T2.packet_file : Return the filename of the packet file. T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run(packet_mode=True) >>> t2.print_packets() %pktNo flowInd ... 1 1 ... 2 1 ... ... """ name = self.packet_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'packet_mode=True' plugin to create " "a packet file") with open(name, encoding='utf-8') as f: print(f.read()) def print_log(self): """Print the contents of Tranalyzer2 `_log.txt` file. Raises ------ RuntimeError If the log file could not be found. See Also -------- T2.print_log : Alias for `T2.print_report()`. T2.log_file : Return the filename of the log file. T2.log : Return Tranalyzer2 `_log.txt` file contents. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_log() """ name = self.log_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer to create a log file") with open(name, encoding='utf-8') as f: print(f.read()) def print_report(self): """Print the contents of Tranalyzer2 `_log.txt` file. Raises ------ RuntimeError If the log file could not be found. See Also -------- T2.print_report : Alias for `T2.print_log()`. T2.log_file : Return the filename of the log file. T2.log : Return Tranalyzer2 `_log.txt` file contents. T2.report : Alias for `T2.log()`. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_report() """ self.print_log() def print_monitoring(self): """Print the contents of Tranalyzer2 `_monitoring.txt` file. Raises ------ RuntimeError If the monitoring file could not be found. See Also -------- T2.monitoring_file : Return the filename of the monitoring file. T2.monitoring : Return Tranalyzer2 `_monitoring.txt` file contents. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.tranalyzer2.MACHINE_REPORT = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.print_monitoring() %repTyp sensorID ... USR1MR_A 666 ... USR1MR_A 666 ... ... """ name = self.monitoring_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'save_monitoring=True' to create " "a monitoring file") with open(name, encoding='utf-8') as f: print(f.read()) # ========================================================================= def follow_stream( self, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]: """Return the reassembled payload of the flow with specified `index` and `direction`. Parameters ---------- flow : int Index (`flowInd`) of the flow to analyze. output_format : int or str, default: 2 The format to use for the output: - 0 or `'payload_only'` - 1 or `'info'`: Prefix each payload with packet/flow info - 2 or `'json'` - 3 or `'reconstruct'` direction : str, default: None The direction to follow (`'A'` or `'B'`). Use `None` to follow both directions. payload_format : int or str, default: 4 The format to use for the payload: - 0 or 'ascii' - 1 or 'hexdump' - 2, 'raw' or 'binary' - 3 or 'base64' - 4 or 'bytearray' reassembly : bool, default: True Analyze TCP sequence numbers, reassemble and reorder TCP segments. colors : bool, default: True Output colors. Returns ------- list of bytearray If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`. list of dict If `output_format` is 2. list of str If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`. str If none of the above applied. Raises ------ RuntimeError - If the `_packets.txt` file could not be found. - If `output_format` or `payload_format` was out of bound. - If `direction` was not `'A'`, `'B'` or `None`. - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`. - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- t2py.T2Utils.T2Utils.follow_stream : Call `tawk` `follow_stream()` function with the specified options. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', ... plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], ... packet_mode=True) >>> t2.tranalyzer2.SPKTMD_PCNTH = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.follow_stream(1, direction='B') [...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\\r\\n')}, ...] """ name = self.packet_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file") return T2Utils.follow_stream(name, flow, output_format, direction, payload_format, reassembly, colors) # ========================================================================= def to_pandas( self, infile: str = None, delimiter: str = None ) -> "pandas.core.frame.DataFrame": """Convert the flow file or `infile` to pandas `DataFrame`. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: None Field delimiter used in the input file. Returns ------- pd.DataFrame DataFrame holding the tabular data stored in `infile`. Raises ------ RuntimeError If `infile` could not be found. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> df = t2.to_pandas() >>> type(df) <class 'pandas.core.frame.DataFrame'> """ infile = infile if infile else self.flow_file() if not infile: raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create ' 'a flow file') return T2Utils.to_pandas(infile, delimiter) # ========================================================================= # Private functions # ========================================================================= def _output_prefix(self) -> str: """Return the output prefix (derived from Tranalyzer2 input if not set). Returns ------- str The output_prefix. """ if self.output_prefix: if not isdir(self.output_prefix) or not self.pcap: return self.output_prefix filename = self.pcap.split('/')[-1] if '.' in filename: return join(self.output_prefix, '.'.join(filename.split('.')[:-1])) return join(self.output_prefix, filename) if self.pcap: if '.' in self.pcap: return '.'.join(self.pcap.split('.')[:-1]) return self.pcap return None def _add_dict_entry(self, plugin: str): if plugin not in self.__dict__: self.__dict__[plugin] = self._plugins[plugin] def _add_dict_entries(self, plugins: List[str]): for plugin in plugins: self._add_dict_entry(plugin) def _remove_dict_entry(self, plugin: str): if plugin in self.__dict__: del self.__dict__[plugin] def _remove_dict_entries(self, plugins: List[str]): for plugin in plugins: self._remove_dict_entry(plugin)Manage several
t2py.T2Pluginobjects and run T2, convert/display flow file, …Parameters
pcap:str, default: None- Path to a pcap file.
iface:str, default: None- Name of a network interface.
pcap_list:strorlistofstr, default: None-
- Path to a list of pcap files, e.g.,
'/tmp/myPcaps.txt'. - Or list of path to pcap files, e.g.,
['file1.pcap', 'file2.pcap'].
- Path to a list of pcap files, e.g.,
output_prefix:str, default: None- If
None, automatically derived from input. save_monitoring:bool, default: False- Save the monitoring report in a
_monitoring.txtfile. packet_mode:bool, default: False- Activate Tranalyzer2 packet mode.
plugin_folder:str, default: None-
Path to the plugin folder.
If
None, default to$HOME/.tranalyzer/plugins. loading_list:str, default: None- Path to a plugin loading list.
plugins:listofstr, default: None,- A list of plugin names to load.
output_format:strorlistofstr, default: None-
Select and configure the sink plugin(s) required to generate the requested output format.
See also
T2.add_output_format(). bpf:str, default: None- A BPF filter.
streaming:bool, default: False- If
True, thenT2.stream()can be used to yield one flow at a time.
Raises
OSError- If specified interface
ifacedoes not exist locally. RuntimeError- If more than one input (
pcap,pcap_list,iface) is specified.
Class variables
var plugin_folder : str-
Path to the plugin folder. If
None, default to'$HOME/.tranalyzer/plugins'.
Instance variables
prop default_plugin_folder : str-
Expand source code
@property def default_plugin_folder(self) -> str: """Path to the default plugin folder, i.e., `$HOME/.tranalyzer/plugins`.""" if not self._default_plugin_folder: self._default_plugin_folder = join(expanduser('~'), '.tranalyzer', 'plugins') return self._default_plugin_folderPath to the default plugin folder, i.e.,
$HOME/.tranalyzer/plugins. prop loading_list-
Expand source code
@property def loading_list(self): """Get or set the path to a plugin loading list. In addition, the setter calls the `T2.add_plugin()` function for each plugin listed in the file. """ return self._loading_listGet or set the path to a plugin loading list.
In addition, the setter calls the
T2.add_plugin()function for each plugin listed in the file. prop plugins : Dict[str, T2Plugin]-
Expand source code
@property def plugins(self) -> Dict[str, T2Plugin]: """Dictionary of `t2py.T2Plugin` objects whose key is the plugin name.""" return self._pluginsDictionary of
t2py.T2Pluginobjects whose key is the plugin name. prop streaming-
Expand source code
@property def streaming(self): """Allow `tranalyzer2` to run in streaming mode. Call `T2.stream()` to access flows one by one. """ return self._streamingAllow
tranalyzer2to run in streaming mode.Call
T2.stream()to access flows one by one. prop t2_exec : str-
Expand source code
@property def t2_exec(self) -> str: """Path to the Tranalyzer2 executable.""" if self.plugin_folder: return join(self.plugin_folder, 'bin', 'tranalyzer') return join(self.default_plugin_folder, 'bin', 'tranalyzer')Path to the Tranalyzer2 executable.
prop tranalyzer2 : T2Plugin-
Expand source code
@property def tranalyzer2(self) -> T2Plugin: """`t2py.T2Plugin` object representing Tranalyzer2 (core).""" if not self._tranalyzer2: self._tranalyzer2 = T2Plugin('tranalyzer2') return self._tranalyzer2t2py.T2Pluginobject representing Tranalyzer2 (core).
Methods
def add_output_format(self, extension: str | List[str])-
Expand source code
def add_output_format( self, extension: Union[str, List[str]] ): """Select and configure the sink plugin(s) required to generate the requested output format. Parameters ---------- extension : str or list of str `extension` may be any of: - `'bin'` -> binSink - `'bin.gz'` -> binSink + `BFS_GZ_COMPRESS = 1` - `'csv'` -> txtSink + `SEP_CHR = ','` - `'csv.gz'` -> txtSink + `SEP_CHR = ','` + `TFS_GZ_COMPRESS = 1` - `'json'` -> jsonSink - `'json.gz'` -> jsonSink + `JSON_GZ_COMPRESS = 1` - `'txt'` -> txtSink - `'txt.gz'` -> txtSink + `TFS_GZ_COMPRESS = 1` - `'netflow'` -> netflowSink - `'mysql'` -> mysqlSink - `'sqlite'` -> sqliteSink - `'postgres'` -> psqlSink - `'mongo'` -> mongoSink - `'socket'` -> socketSink - `'streaming'` -> socketSink + `SKS_CONTENT_TYPE = 2` - `'pcap'` -> pcapd Raises ------ RuntimeError If one of the specified extension is not supported. Examples -------- >>> t2 = T2() >>> t2.add_output_format(['json.gz', 'txt']) """ if isinstance(extension, list): extensions = extension else: extensions = [extension] for ext in extensions: if ext == 'bin': self.add_plugin('binSink') elif ext == 'bin.gz': self.add_plugin('binSink') self.binSink.BFS_GZ_COMPRESS = 1 elif ext == 'csv': self.add_plugin('txtSink') self.txtSink.SEP_CHR = ',' elif ext == 'csv.gz': self.add_plugin('txtSink') self.txtSink.SEP_CHR = ',' self.txtSink.TFS_GZ_COMPRESS = 1 elif ext == 'txt': self.add_plugin('txtSink') elif ext == 'txt.gz': self.add_plugin('txtSink') self.txtSink.TFS_GZ_COMPRESS = 1 elif ext == 'json': self.add_plugin('jsonSink') elif ext == 'json.gz': self.add_plugin('jsonSink') self.jsonSink.JSON_GZ_COMPRESS = 1 elif ext == 'netflow': self.add_plugin('netflowSink') elif ext == 'mysql': self.add_plugin('mysqlSink') elif ext == 'sqlite': self.add_plugin('sqliteSink') elif ext == 'postgres': self.add_plugin('psqlSink') elif ext == 'mongo': self.add_plugin('mongoSink') elif ext == 'socket': self.add_plugin('socketSink') elif ext == 'pcap': self.add_plugin('pcapd') elif ext == 'streaming': self.streaming = True else: raise RuntimeError(f"output format '{ext}' not supported")Select and configure the sink plugin(s) required to generate the requested output format.
Parameters
extension:strorlistofstr-
extensionmay be any of:'bin'-> binSink'bin.gz'-> binSink +BFS_GZ_COMPRESS = 1'csv'-> txtSink +SEP_CHR = ',''csv.gz'-> txtSink +SEP_CHR = ','+TFS_GZ_COMPRESS = 1'json'-> jsonSink'json.gz'-> jsonSink +JSON_GZ_COMPRESS = 1'txt'-> txtSink'txt.gz'-> txtSink +TFS_GZ_COMPRESS = 1'netflow'-> netflowSink'mysql'-> mysqlSink'sqlite'-> sqliteSink'postgres'-> psqlSink'mongo'-> mongoSink'socket'-> socketSink'streaming'-> socketSink +SKS_CONTENT_TYPE = 2'pcap'-> pcapd
Raises
RuntimeError- If one of the specified extension is not supported.
Examples
>>> t2 = T2() >>> t2.add_output_format(['json.gz', 'txt']) def add_plugin(self, plugin: str)-
Expand source code
def add_plugin( self, plugin: str ): """Add a plugin to the list of plugins. Parameters ---------- plugin : str The name of the plugin to add. See Also -------- T2.add_plugins : Add a list of plugins to the list of plugins. T2.remove_plugin : Remove a plugin from the list of plugins. T2.remove_plugins : Remove a list of plugins from the list of plugins. Examples -------- >>> t2 = T2() >>> t2.add_plugin('arpDecode') >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.arpDecode.number '179' """ if plugin not in self._plugins: self._plugins[plugin] = T2Plugin(plugin) self._add_dict_entry(plugin)Add a plugin to the list of plugins.
Parameters
plugin:str- The name of the plugin to add.
See Also
T2.add_plugins()- Add a list of plugins to the list of plugins.
T2.remove_plugin()- Remove a plugin from the list of plugins.
T2.remove_plugins()- Remove a list of plugins from the list of plugins.
Examples
>>> t2 = T2() >>> t2.add_plugin('arpDecode') >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.arpDecode.number '179' def add_plugins(self, plugins: List[str])-
Expand source code
def add_plugins( self, plugins: List[str] ): """Add a list of plugins to the list of plugins. Parameters ---------- plugins : list of str The list of plugins to add. See Also -------- T2.add_plugin : Add a plugin to the list of plugins. T2.remove_plugin : Remove a plugin from the list of plugins. T2.remove_plugins : Remove a list of plugins from the list of plugins. Examples -------- >>> t2 = T2() >>> t2.add_plugins(['arpDecode', 'basicStats']) >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.basicStats.description 'Basic statistics' """ for plugin in plugins: self.add_plugin(plugin)Add a list of plugins to the list of plugins.
Parameters
plugins:listofstr- The list of plugins to add.
See Also
T2.add_plugin()- Add a plugin to the list of plugins.
T2.remove_plugin()- Remove a plugin from the list of plugins.
T2.remove_plugins()- Remove a list of plugins from the list of plugins.
Examples
>>> t2 = T2() >>> t2.add_plugins(['arpDecode', 'basicStats']) >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.basicStats.description 'Basic statistics' def apply_changes(self, verbose: bool = False)-
Expand source code
def apply_changes( self, verbose: bool = False ): """Apply/persist the current configuration to the source files. Parameters ---------- verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If one of the processes exited with a non-zero exit code. Examples -------- >>> t2 = T2(output_format='csv.gz') >>> t2.apply_changes() """ self.tranalyzer2.apply_changes(verbose=verbose) for plugin in self._plugins.values(): plugin.apply_changes(verbose=verbose)Apply/persist the current configuration to the source files.
Parameters
verbose:bool, default: False-
- If
True, print the output (stdoutandstderr) of the command. - If
False, do not print anything.
- If
Raises
subprocess.CalledProcessError- If one of the processes exited with a non-zero exit code.
Examples
>>> t2 = T2(output_format='csv.gz') >>> t2.apply_changes() def build(self,
plugin: str | List[str] = None,
plugin_folder: str = None,
force_rebuild: bool = False,
debug: bool = False,
verbose: bool = False)-
Expand source code
def build( self, plugin: Union[str, List[str]] = None, plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False ): """Build Tranalyzer2 and the plugins. Parameters ---------- plugin : str or list of str, default: None List of plugin names to build or path to a plugin loading list. If `None`, use `T2.list_plugins()`. plugin_folder : str, default: None Path to the plugin folder. If `None`, use `T2.plugin_folder`. force_rebuild : bool, default: False Force the rebuild of the configuration files (`$ t2build -r ...`). debug : bool, default: False Build the plugin in debug mode. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2.clean : Remove files generated by `T2.build()`. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() """ plugin_folder = plugin_folder if plugin_folder else self.plugin_folder to_build = plugin if plugin else list(self._plugins.keys()) if not plugin: to_build.append('tranalyzer2') T2Utils.build(to_build, plugin_folder, force_rebuild, debug, verbose=verbose)Build Tranalyzer2 and the plugins.
Parameters
plugin:strorlistofstr, default: None-
List of plugin names to build or path to a plugin loading list.
If
None, useT2.list_plugins(). plugin_folder:str, default: None-
Path to the plugin folder.
If
None, useT2.plugin_folder. force_rebuild:bool, default: False- Force the rebuild of the configuration files (
$ t2build -r ...). debug:bool, default: False- Build the plugin in debug mode.
verbose:bool, default: False-
- If
True, print the output (stdoutandstderr) of the command. - If
False, do not print anything.
- If
Raises
subprocess.CalledProcessError- If the process exited with a non-zero exit code.
See Also
T2.clean()- Remove files generated by
T2.build().
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() def clean(self, plugin: str | List[str] = None, verbose: bool = False)-
Expand source code
def clean( self, plugin: Union[str, List[str]] = None, verbose: bool = False ): """Clean Tranalyzer2 and the plugins. Parameters ---------- plugin : str or list of str, default: None List of plugin names to clean, or path to a plugin loading list. If `None`, use `T2.list_plugins()`. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2.build : Build Tranalyzer2 and the plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() >>> t2.clean() """ to_clean = plugin if plugin else list(self._plugins.keys()) if not plugin: to_clean.append('tranalyzer2') T2Utils.clean(to_clean, verbose=verbose)Clean Tranalyzer2 and the plugins.
Parameters
plugin:strorlistofstr, default: None-
List of plugin names to clean, or path to a plugin loading list.
If
None, useT2.list_plugins(). verbose:bool, default: False-
- If
True, print the output (stdoutandstderr) of the command. - If
False, do not print anything.
- If
Raises
subprocess.CalledProcessError- If the process exited with a non-zero exit code.
See Also
T2.build()- Build Tranalyzer2 and the plugins.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() >>> t2.clean() def clear_plugins(self)-
Expand source code
def clear_plugins(self): """Clear the list of plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats']) >>> t2.clear_plugins() >>> t2.list_plugins() [] """ self._remove_dict_entries(list(self._plugins.keys())) self._plugins.clear()Clear the list of plugins.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats']) >>> t2.clear_plugins() >>> t2.list_plugins() [] def create_plugin_list(self, plugins: List[str] = None, outfile: str = None, verbose: bool = False)-
Expand source code
def create_plugin_list( self, plugins: List[str] = None, outfile: str = None, verbose: bool = False ): """Create a loading list of plugins. Parameters ---------- plugins : list of str, default: None List of plugin names. If `None`, use `T2.list_plugins()`. outfile : str, default: None Name of the file to save. If `None`, default to `'plugins.load'` in the plugin folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.create_plugin_list() >>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt') >>> t2.create_plugin_list(['basicFlow', 'txtSink']) >>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt') """ if not outfile: if self.plugin_folder: outfile = join(self.plugin_folder, 'plugins.load') else: outfile = join(self.default_plugin_folder, 'plugins.load') if not plugins: plugins = list(self._plugins.keys()) T2Utils.create_plugin_list(plugins, outfile, verbose=verbose) self.loading_list = outfileCreate a loading list of plugins.
Parameters
plugins:listofstr, default: None-
List of plugin names.
If
None, useT2.list_plugins(). outfile:str, default: None-
Name of the file to save.
If
None, default to'plugins.load'in the plugin folder. verbose:bool, default: False-
- If
True, print the output (stdoutandstderr) of the command. - If
False, do not print anything.
- If
Raises
subprocess.CalledProcessError- If the process exited with a non-zero exit code.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.create_plugin_list() >>> t2.create_plugin_list(outfile='/tmp/myPlugins.txt') >>> t2.create_plugin_list(['basicFlow', 'txtSink']) >>> t2.create_plugin_list(['basicFlow', 'txtSink'], '/tmp/myPlugins.txt') def discard_changes(self)-
Expand source code
def discard_changes(self): """Discard all non-committed changes made in this session. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'txtSink']) >>> t2.arpDecode.MAX_IP = 5 >>> t2.discard_changes() """ for plugin in self._plugins.values(): plugin.discard_changes()Discard all non-committed changes made in this session.
Examples
>>> t2 = T2(plugins=['arpDecode', 'txtSink']) >>> t2.arpDecode.MAX_IP = 5 >>> t2.discard_changes() def flow_file(self) ‑> str-
Expand source code
def flow_file(self) -> str: """Return the filename of the JSON (preferred) or TXT flow file. Returns ------- str The filename of the flow file. See Also -------- T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. T2.flow_file_json : Return the filename of the JSON flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt']) >>> t2.run() >>> t2.flow_file() '/tmp/file_flows.json' """ if 'jsonSink' in self._plugins: return self.flow_file_json() if 'txtSink' in self._plugins: return self.flow_file_txt() return NoneReturn the filename of the JSON (preferred) or TXT flow file.
Returns
str- The filename of the flow file.
See Also
T2.flows()- Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects. T2.print_flows()- Print the contents of Tranalyzer2
_flows.json(preferred) or_flows.txtfile. T2.flow_file_json()- Return the filename of the JSON flow file.
T2.flows_json()- Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects. T2.print_flows_json()- Print the contents of Tranalyzer2
_flows.jsonfile. T2.flow_file_txt()- Return the filename of the TXT flow file.
T2.flows_txt()- Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects. T2.print_flows_txt()- Print the contents of Tranalyzer2
_flows.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format=['json', 'txt']) >>> t2.run() >>> t2.flow_file() '/tmp/file_flows.json' def flow_file_json(self) ‑> str-
Expand source code
def flow_file_json(self) -> str: """Return the filename of the JSON flow file. Returns ------- str The filename of the JSON flow file. See Also -------- T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='json') >>> t2.run() >>> t2.flow_file_json() '/tmp/file_flows.json' """ output_prefix = self._output_prefix() return output_prefix + '_flows.json' if output_prefix else NoneReturn the filename of the JSON flow file.
Returns
str- The filename of the JSON flow file.
See Also
T2.flows_json()- Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects. T2.print_flows_json()- Print the contents of Tranalyzer2
_flows.jsonfile. T2.flow_file_txt()- Return the filename of the TXT flow file.
T2.flows_txt()- Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects. T2.print_flows_txt()- Print the contents of Tranalyzer2
_flows.txtfile. T2.flow_file()- Return the filename of the JSON (preferred) or TXT flow file.
T2.flows()- Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects. T2.print_flows()- Print the contents of Tranalyzer2
_flows.json(preferred) or_flows.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format='json') >>> t2.run() >>> t2.flow_file_json() '/tmp/file_flows.json' def flow_file_txt(self) ‑> str-
Expand source code
def flow_file_txt(self) -> str: """Return the filename of the TXT flow file. Returns ------- str The filename of the TXT flow file. See Also -------- T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.txt` file. T2.flow_file_json : Return the filename of the JSON flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv') >>> t2.run() >>> t2.flow_file_txt() '/tmp/file_flows.txt' """ output_prefix = self._output_prefix() return output_prefix + '_flows.txt' if output_prefix else NoneReturn the filename of the TXT flow file.
Returns
str- The filename of the TXT flow file.
See Also
T2.flows_txt()- Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects. T2.print_flows_txt()- Print the contents of Tranalyzer2
_flows.txtfile. T2.flow_file_json()- Return the filename of the JSON flow file.
T2.flows_json()- Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects. T2.print_flows_json()- Print the contents of Tranalyzer2
_flows.jsonfile. T2.flow_file()- Return the filename of the JSON (preferred) or TXT flow file.
T2.flows()- Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects. T2.print_flows()- Print the contents of Tranalyzer2
_flows.json(preferred) or_flows.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv') >>> t2.run() >>> t2.flow_file_txt() '/tmp/file_flows.txt' def flows(self) ‑> List[Dict[str, Any]]-
Expand source code
def flows(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the flow file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If neither the `_flows.json` nor the `_flows.txt` file could be found. See Also -------- T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flow_file_json : Return the filename of the JSON flow file. T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.print_flows : Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. T2.print_flows_json : Print the contents of Tranalyzer2 `_flows.json` file. T2.print_flows_txt : Print the contents of Tranalyzer2 `_flows.json` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.flows() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ if 'jsonSink' in self._plugins: return self.flows_json() if 'txtSink' in self._plugins: return self.flows_txt() raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create ' 'a flow file')Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects.Returns
listofdict-
The list of rows contained in the flow file.
Each row is represented as a dict with the column names as key.
Raises
RuntimeError- If neither the
_flows.jsonnor the_flows.txtfile could be found.
See Also
T2.flow_file()- Return the filename of the JSON (preferred) or TXT flow file.
T2.flow_file_json()- Return the filename of the JSON flow file.
T2.flow_file_txt()- Return the filename of the TXT flow file.
T2.flows_json()- Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects. T2.flows_txt()- Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects. T2.print_flows()- Print the contents of Tranalyzer2
_flows.json(preferred) or_flows.txtfile. T2.print_flows_json()- Print the contents of Tranalyzer2
_flows.jsonfile. T2.print_flows_txt()- Print the contents of Tranalyzer2
_flows.jsonfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.flows() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] def flows_json(self) ‑> List[Dict[str, Any]]-
Expand source code
def flows_json(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the JSON flow file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_flows.json` file could not be found. See Also -------- T2.flow_file_json : Return the filename of the JSON flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flow_file_txt : Return the filename of the TXT flow file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink']) >>> t2.build() >>> t2.run() >>> t2.flows_json() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ name = self.flow_file_json() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create ' 'a JSON flow file') return T2Utils.to_json_array(name)Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects.Returns
listofdict-
The list of rows contained in the JSON flow file.
Each row is represented as a dict with the column names as key.
Raises
RuntimeError- If the
_flows.jsonfile could not be found.
See Also
T2.flow_file_json()- Return the filename of the JSON flow file.
T2.flows()- Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects. T2.flows_txt()- Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects. T2.flow_file()- Return the filename of the JSON (preferred) or TXT flow file.
T2.flow_file_txt()- Return the filename of the TXT flow file.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink']) >>> t2.build() >>> t2.run() >>> t2.flows_json() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] def flows_txt(self, delimiter: str = None) ‑> List[Dict[str, Any]]-
Expand source code
def flows_txt( self, delimiter: str = None ) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. Parameters ---------- delimiter : str, default: None The delimiter used to separate columns in the input file. If `None`, default to '\\t' if the txtSink plugin is not loaded else to the value of `txtSink.SEP_CHR`. Returns ------- list of dict The list of rows contained in the TXT flow file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_flows.txt` file could not be found. See Also -------- T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flow_file_json : Return the filename of the JSON flow file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.flows_txt() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ name = self.flow_file_txt() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file') if delimiter is None: if 'txtSink' not in self._plugins: delimiter = '\t' else: delimiter = self.txtSink.SEP_CHR.strip('"') if delimiter[0] == '\\': if delimiter[1] == 't': delimiter = '\t' elif delimiter[1] == 'n': delimiter = '\n' elif delimiter[1] == 'r': delimiter = '\r' elif delimiter[1] == '\\': delimiter = '\\' return T2Utils.to_json_array(name, delimiter)Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects.Parameters
delimiter:str, default: None-
The delimiter used to separate columns in the input file.
If
None, default to '\t' if the txtSink plugin is not loaded else to the value oftxtSink.SEP_CHR.
Returns
listofdict-
The list of rows contained in the TXT flow file.
Each row is represented as a dict with the column names as key.
Raises
RuntimeError- If the
_flows.txtfile could not be found.
See Also
T2.flow_file_txt()- Return the filename of the TXT flow file.
T2.flows()- Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects. T2.flows_json()- Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects. T2.flow_file()- Return the filename of the JSON (preferred) or TXT flow file.
T2.flow_file_json()- Return the filename of the JSON flow file.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.flows_txt() [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] def follow_stream(self,
flow: int,
output_format: int | str = 2,
direction: str = None,
payload_format: int | str = 4,
reassembly: bool = True,
colors: bool = True) ‑> str | List[str] | List[bytearray] | List[Dict[str, Any]]-
Expand source code
def follow_stream( self, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]: """Return the reassembled payload of the flow with specified `index` and `direction`. Parameters ---------- flow : int Index (`flowInd`) of the flow to analyze. output_format : int or str, default: 2 The format to use for the output: - 0 or `'payload_only'` - 1 or `'info'`: Prefix each payload with packet/flow info - 2 or `'json'` - 3 or `'reconstruct'` direction : str, default: None The direction to follow (`'A'` or `'B'`). Use `None` to follow both directions. payload_format : int or str, default: 4 The format to use for the payload: - 0 or 'ascii' - 1 or 'hexdump' - 2, 'raw' or 'binary' - 3 or 'base64' - 4 or 'bytearray' reassembly : bool, default: True Analyze TCP sequence numbers, reassemble and reorder TCP segments. colors : bool, default: True Output colors. Returns ------- list of bytearray If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`. list of dict If `output_format` is 2. list of str If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`. str If none of the above applied. Raises ------ RuntimeError - If the `_packets.txt` file could not be found. - If `output_format` or `payload_format` was out of bound. - If `direction` was not `'A'`, `'B'` or `None`. - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`. - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- t2py.T2Utils.T2Utils.follow_stream : Call `tawk` `follow_stream()` function with the specified options. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', ... plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], ... packet_mode=True) >>> t2.tranalyzer2.SPKTMD_PCNTH = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.follow_stream(1, direction='B') [...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\\r\\n')}, ...] """ name = self.packet_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file") return T2Utils.follow_stream(name, flow, output_format, direction, payload_format, reassembly, colors)Return the reassembled payload of the flow with specified
indexanddirection.Parameters
flow:int- Index (
flowInd) of the flow to analyze. output_format:intorstr, default: 2-
The format to use for the output:
- 0 or
'payload_only' - 1 or
'info': Prefix each payload with packet/flow info - 2 or
'json' - 3 or
'reconstruct'
- 0 or
direction:str, default: None-
The direction to follow (
'A'or'B').Use
Noneto follow both directions. payload_format:intorstr, default: 4-
The format to use for the payload:
- 0 or 'ascii'
- 1 or 'hexdump'
- 2, 'raw' or 'binary'
- 3 or 'base64'
- 4 or 'bytearray'
reassembly:bool, default: True- Analyze TCP sequence numbers, reassemble and reorder TCP segments.
colors:bool, default: True- Output colors.
Returns
listofbytearray- If
output_formatis 0 or 2 andpayload_formatis 4 or'bytearray'. listofdict- If
output_formatis 2. listofstr- If
output_formatis 0 andpayload_formatis not 4 or'bytearray'. str- If none of the above applied.
Raises
RuntimeError-
- If the
_packets.txtfile could not be found. - If
output_formatorpayload_formatwas out of bound. - If
directionwas not'A','B'orNone. - If
output_formatwas 3 or'reconstruct'anddirectionwas not'A'or'B'. - If
payload_formatwas 4 or'bytearray'andoutput_formatwas 1 or'info'.
- If the
subprocess.CalledProcessError- If the process exited with a non-zero exit code.
See Also
T2Utils.follow_stream()- Call
tawkfollow_stream()function with the specified options.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', ... plugins=['basicFlow', 'basicStats', 'tcpFlags', 'txtSink'], ... packet_mode=True) >>> t2.tranalyzer2.SPKTMD_PCNTH = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.follow_stream(1, direction='B') [...,{"flow": 1, "packet": 125, ..., "payload": bytearray(b'226 Transfer complete.\r\n')}, ...] def headers(self) ‑> str-
Expand source code
def headers(self) -> str: """Return Tranalyzer2 `_headers.txt` file contents. Returns ------- str The raw contents of the header file. Raises ------ RuntimeError If the `_headers.txt` file could not be found. See Also -------- T2.headers_file : Return the filename of the headers file. T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.headers() '# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...' """ name = self.headers_file() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file') return self._load_file(name)Return Tranalyzer2
_headers.txtfile contents.Returns
str- The raw contents of the header file.
Raises
RuntimeError- If the
_headers.txtfile could not be found.
See Also
T2.headers_file()- Return the filename of the headers file.
T2.print_headers()- Print the contents of Tranalyzer2
_headers.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.headers() '# Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST)...' def headers_file(self) ‑> str-
Expand source code
def headers_file(self) -> str: """Return the filename of the headers file. Returns ------- str The filename of the headers file. See Also -------- T2.headers : Return Tranalyzer2 `_headers.txt` file contents. T2.print_headers : Print the contents of Tranalyzer2 `_headers.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.run() >>> t2.headers_file() '/tmp/file_headers.txt' """ output_prefix = self._output_prefix() return output_prefix + '_headers.txt' if output_prefix else NoneReturn the filename of the headers file.
Returns
str- The filename of the headers file.
See Also
T2.headers()- Return Tranalyzer2
_headers.txtfile contents. T2.print_headers()- Print the contents of Tranalyzer2
_headers.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.run() >>> t2.headers_file() '/tmp/file_headers.txt' def list_plugins(self)-
Expand source code
def list_plugins(self): """Alphabetically list the loaded plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.list_plugins() ['arpDecode', 'basicStats', 'tcpFlags'] >>> t2.arpDecode.description 'Address Resolution Protocol (ARP)' """ return sorted(self._plugins.keys())Alphabetically list the loaded plugins.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.list_plugins() ['arpDecode', 'basicStats', 'tcpFlags'] >>> t2.arpDecode.description 'Address Resolution Protocol (ARP)' def log(self) ‑> str-
Expand source code
def log(self) -> str: """Return Tranalyzer2 `_log.txt` file contents. Returns ------- str The raw contents of the log file. Raises ------ RuntimeError If the `_log.txt` file could not be found. See Also -------- T2.report : Alias for `T2.log()` T2.log_file : Return the filename of the log file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.log() '================================================================================...' """ return self.report()Return Tranalyzer2
_log.txtfile contents.Returns
str- The raw contents of the log file.
Raises
RuntimeError- If the
_log.txtfile could not be found.
See Also
T2.report()- Alias for
T2.log() T2.log_file()- Return the filename of the log file.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.log() '================================================================================...' def log_file(self) ‑> str-
Expand source code
def log_file(self) -> str: """Return the filename of the log file. Returns ------- str The filename of the log file. See Also -------- T2.log : Return Tranalyzer2 `_log.txt` file contents. T2.print_log : Print the contents of Tranalyzer2 `_log.txt` file. T2.report : Alias for `T2.log()`. T2.print_report : Alias for `T2.print_log()`. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt') >>> t2.run() >>> t2.log_file() '/tmp/file_log.txt' """ output_prefix = self._output_prefix() return output_prefix + '_log.txt' if output_prefix else NoneReturn the filename of the log file.
Returns
str- The filename of the log file.
See Also
T2.log()- Return Tranalyzer2
_log.txtfile contents. T2.print_log()- Print the contents of Tranalyzer2
_log.txtfile. T2.report()- Alias for
T2.log(). T2.print_report()- Alias for
T2.print_log().
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt') >>> t2.run() >>> t2.log_file() '/tmp/file_log.txt' def monitoring(self) ‑> List[Dict[str, Any]]-
Expand source code
def monitoring(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_monitoring.txt` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the monitoring file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_monitoring.txt` file could not be found. See Also -------- T2.monitoring_file : Return the filename of the monitoring file. T2.print_monitoring : Print the contents of Tranalyzer2 `_monitoring.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], monitoring_file=True) >>> t2.tranalyzer2.MACHINE_REPORT = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.monitoring() [{'repTyp': 'USR1MR_A', 'sensorID': 666, 'time': ...}, {'repTyp': 'USR1MR_A', ...}, ...] """ name = self.monitoring_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'save_monitoring=True' to create " "a monitoring file") return T2Utils.to_json_array(name)Return Tranalyzer2
_monitoring.txtfile contents as an array of JSON objects.Returns
listofdict-
The list of rows contained in the monitoring file.
Each row is represented as a dict with the column names as key.
Raises
RuntimeError- If the
_monitoring.txtfile could not be found.
See Also
T2.monitoring_file()- Return the filename of the monitoring file.
T2.print_monitoring()- Print the contents of Tranalyzer2
_monitoring.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], monitoring_file=True) >>> t2.tranalyzer2.MACHINE_REPORT = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.monitoring() [{'repTyp': 'USR1MR_A', 'sensorID': 666, 'time': ...}, {'repTyp': 'USR1MR_A', ...}, ...] def monitoring_file(self) ‑> str-
Expand source code
def monitoring_file(self) -> str: """Return the filename of the monitoring file. Returns ------- str The filename of the monitoring file. See Also -------- T2.monitoring : Return Tranalyzer2 `_monitoring.txt` file contents. T2.print_monitoring : Print the contents of Tranalyzer2 `_monitoring.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt', monitoring_file=True) >>> t2.run() >>> t2.monitoring_file() '/tmp/file_monitoring.txt' """ output_prefix = self._output_prefix() return output_prefix + '_monitoring.txt' if output_prefix else NoneReturn the filename of the monitoring file.
Returns
str- The filename of the monitoring file.
See Also
T2.monitoring()- Return Tranalyzer2
_monitoring.txtfile contents. T2.print_monitoring()- Print the contents of Tranalyzer2
_monitoring.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt', monitoring_file=True) >>> t2.run() >>> t2.monitoring_file() '/tmp/file_monitoring.txt' def packet_file(self) ‑> str-
Expand source code
def packet_file(self) -> str: """Return the filename of the packet file. Returns ------- str The filename of the packet file. See Also -------- T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objects. T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt`. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap') >>> t2.run(packet_mode=True) >>> t2.packet_file() '/tmp/file_packets.txt' """ output_prefix = self._output_prefix() return output_prefix + '_packets.txt' if output_prefix else NoneReturn the filename of the packet file.
Returns
str- The filename of the packet file.
See Also
T2.packets()- Return Tranalyzer2
_packets.txtfile contents as an array of JSON objects. T2.print_packets()- Print the contents of Tranalyzer2
_packets.txt.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap') >>> t2.run(packet_mode=True) >>> t2.packet_file() '/tmp/file_packets.txt' def packets(self) ‑> List[Dict[str, Any]]-
Expand source code
def packets(self) -> List[Dict[str, Any]]: """Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objects. Returns ------- list of dict The list of rows contained in the packet file. Each row is represented as a dict with the column names as key. Raises ------ RuntimeError If the `_packets.txt` file could not be found. See Also -------- T2.packet_file : Return the filename of the packet file. T2.print_packets : Print the contents of Tranalyzer2 `_packets.txt` file. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True) >>> t2.build() >>> t2.run() >>> t2.packets() [{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...] """ name = self.packet_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'packet_mode=True' to create a packet file") return T2Utils.to_json_array(name)Return Tranalyzer2
_packets.txtfile contents as an array of JSON objects.Returns
listofdict-
The list of rows contained in the packet file.
Each row is represented as a dict with the column names as key.
Raises
RuntimeError- If the
_packets.txtfile could not be found.
See Also
T2.packet_file()- Return the filename of the packet file.
T2.print_packets()- Print the contents of Tranalyzer2
_packets.txtfile.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink'], packet_mode=True) >>> t2.build() >>> t2.run() >>> t2.packets() [{'pktNo': 1, 'flowInd': 1, ...}, {'pktNo': 2, 'flowInd': 1, ...}, ...] def print_flows(self)-
Expand source code
def print_flows(self): """Print the contents of Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file. Raises ------ RuntimeError If the flow file could not be found. See Also -------- T2.flow_file : Return the filename of the JSON (preferred) or TXT flow file. T2.flows : Return Tranalyzer2 `_flows.json` (preferred) or `_flows.txt` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows() %dir flowInd ... A 1 ... B 1 ... ... """ name = self.flow_file() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file') with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_flows.json(preferred) or_flows.txtfile.Raises
RuntimeError- If the flow file could not be found.
See Also
T2.flow_file()- Return the filename of the JSON (preferred) or TXT flow file.
T2.flows()- Return Tranalyzer2
_flows.json(preferred) or_flows.txtfile contents as an array of JSON objects.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows() %dir flowInd ... A 1 ... B 1 ... ... def print_flows_json(self)-
Expand source code
def print_flows_json(self): """Print the contents of Tranalyzer2 `_flows.json` file. Raises ------ RuntimeError If the JSON flow file could not be found. See Also -------- T2.flow_file_json : Return the filename of the JSON flow file. T2.flows_json : Return Tranalyzer2 `_flows.json` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows_json() {"dir":"A","flowInd":1,...} {"dir":"B","flowInd":1,...} ... """ name = self.flow_file_json() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the jsonSink plugin to create a JSON flow file') with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_flows.jsonfile.Raises
RuntimeError- If the JSON flow file could not be found.
See Also
T2.flow_file_json()- Return the filename of the JSON flow file.
T2.flows_json()- Return Tranalyzer2
_flows.jsonfile contents as an array of JSON objects.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'jsonSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows_json() {"dir":"A","flowInd":1,...} {"dir":"B","flowInd":1,...} ... def print_flows_txt(self)-
Expand source code
def print_flows_txt(self): """Print the contents of Tranalyzer2 `_flows.txt` file. Raises ------ RuntimeError If the flow file could not be found. See Also -------- T2.flow_file_txt : Return the filename of the TXT flow file. T2.flows_txt : Return Tranalyzer2 `_flows.txt` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows_txt() %dir flowInd ... A 1 ... B 1 ... ... """ name = self.flow_file_txt() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a flow file') with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_flows.txtfile.Raises
RuntimeError- If the flow file could not be found.
See Also
T2.flow_file_txt()- Return the filename of the TXT flow file.
T2.flows_txt()- Return Tranalyzer2
_flows.txtfile contents as an array of JSON objects.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_flows_txt() %dir flowInd ... A 1 ... B 1 ... ... def print_headers(self)-
Expand source code
def print_headers(self): """Print the contents of Tranalyzer2 `_headers.txt` file. Raises ------ RuntimeError If the header file could not be found. See Also -------- T2.headers_file : Return the filename of the headers file. T2.headers : Return Tranalyzer2 `_headers.txt` file contents. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_headers() # Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST) # Tranalyzer 0.8.11 (Anteater), Tarantula. ... """ name = self.headers_file() if not name or not isfile(name): raise RuntimeError('Run Tranalyzer with the txtSink plugin to create a headers file') with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_headers.txtfile.Raises
RuntimeError- If the header file could not be found.
See Also
T2.headers_file()- Return the filename of the headers file.
T2.headers()- Return Tranalyzer2
_headers.txtfile contents.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_headers() # Date: 1628090651.486362 sec (Wed 04 Aug 2021 17:24:11 CEST) # Tranalyzer 0.8.11 (Anteater), Tarantula. ... def print_log(self)-
Expand source code
def print_log(self): """Print the contents of Tranalyzer2 `_log.txt` file. Raises ------ RuntimeError If the log file could not be found. See Also -------- T2.print_log : Alias for `T2.print_report()`. T2.log_file : Return the filename of the log file. T2.log : Return Tranalyzer2 `_log.txt` file contents. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_log() """ name = self.log_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer to create a log file") with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_log.txtfile.Raises
RuntimeError- If the log file could not be found.
See Also
T2.print_log()- Alias for
T2.print_report(). T2.log_file()- Return the filename of the log file.
T2.log()- Return Tranalyzer2
_log.txtfile contents.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_log() def print_monitoring(self)-
Expand source code
def print_monitoring(self): """Print the contents of Tranalyzer2 `_monitoring.txt` file. Raises ------ RuntimeError If the monitoring file could not be found. See Also -------- T2.monitoring_file : Return the filename of the monitoring file. T2.monitoring : Return Tranalyzer2 `_monitoring.txt` file contents. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.tranalyzer2.MACHINE_REPORT = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.print_monitoring() %repTyp sensorID ... USR1MR_A 666 ... USR1MR_A 666 ... ... """ name = self.monitoring_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'save_monitoring=True' to create " "a monitoring file") with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_monitoring.txtfile.Raises
RuntimeError- If the monitoring file could not be found.
See Also
T2.monitoring_file()- Return the filename of the monitoring file.
T2.monitoring()- Return Tranalyzer2
_monitoring.txtfile contents.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.tranalyzer2.MACHINE_REPORT = 1 >>> t2.tranalyzer2.apply_changes() >>> t2.build() >>> t2.run() >>> t2.print_monitoring() %repTyp sensorID ... USR1MR_A 666 ... USR1MR_A 666 ... ... def print_packets(self)-
Expand source code
def print_packets(self): """Print the contents of Tranalyzer2 `_packets.txt` file. Raises ------ RuntimeError If the packet file could not be found. See Also -------- T2.packet_file : Return the filename of the packet file. T2.packets : Return Tranalyzer2 `_packets.txt` file contents as an array of JSON objects. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run(packet_mode=True) >>> t2.print_packets() %pktNo flowInd ... 1 1 ... 2 1 ... ... """ name = self.packet_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer with 'packet_mode=True' plugin to create " "a packet file") with open(name, encoding='utf-8') as f: print(f.read())Print the contents of Tranalyzer2
_packets.txtfile.Raises
RuntimeError- If the packet file could not be found.
See Also
T2.packet_file()- Return the filename of the packet file.
T2.packets()- Return Tranalyzer2
_packets.txtfile contents as an array of JSON objects.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run(packet_mode=True) >>> t2.print_packets() %pktNo flowInd ... 1 1 ... 2 1 ... ... def print_report(self)-
Expand source code
def print_report(self): """Print the contents of Tranalyzer2 `_log.txt` file. Raises ------ RuntimeError If the log file could not be found. See Also -------- T2.print_report : Alias for `T2.print_log()`. T2.log_file : Return the filename of the log file. T2.log : Return Tranalyzer2 `_log.txt` file contents. T2.report : Alias for `T2.log()`. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_report() """ self.print_log()Print the contents of Tranalyzer2
_log.txtfile.Raises
RuntimeError- If the log file could not be found.
See Also
T2.print_report()- Alias for
T2.print_log(). T2.log_file()- Return the filename of the log file.
T2.log()- Return Tranalyzer2
_log.txtfile contents. T2.report()- Alias for
T2.log().
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.print_report() def remove_plugin(self, plugin: str)-
Expand source code
def remove_plugin( self, plugin: str ): """Remove a plugin from the list of plugins. Parameters ---------- plugin : str The name of the plugin to remove. See Also -------- T2.add_plugin : Add a plugin to the list of plugins. T2.add_plugins : Add a list of plugins to the list of plugins. T2.remove_plugins : Remove a list of plugins from the list of plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats']) >>> t2.remove_plugin('arpDecode') >>> t2.list_plugins() ['basicStats'] """ self._remove_dict_entry(plugin) self._plugins.pop(plugin)Remove a plugin from the list of plugins.
Parameters
plugin:str- The name of the plugin to remove.
See Also
T2.add_plugin()- Add a plugin to the list of plugins.
T2.add_plugins()- Add a list of plugins to the list of plugins.
T2.remove_plugins()- Remove a list of plugins from the list of plugins.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats']) >>> t2.remove_plugin('arpDecode') >>> t2.list_plugins() ['basicStats'] def remove_plugins(self, plugins: List[str])-
Expand source code
def remove_plugins( self, plugins: List[str] ): """Remove a list of plugins from the list of plugins. Parameters ---------- plugins : list of str The list of plugins to remove. See Also -------- T2.add_plugin : Add a plugin to the list of plugins. T2.add_plugins : Add a list of plugins to the list of plugins. T2.remove_plugin : Remove a plugin from the list of plugins. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.remove_plugins(['arpDecode', 'basicStats']) >>> t2.list_plugins() ['tcpFlags'] """ for plugin in plugins: self.remove_plugin(plugin)Remove a list of plugins from the list of plugins.
Parameters
plugins:listofstr- The list of plugins to remove.
See Also
T2.add_plugin()- Add a plugin to the list of plugins.
T2.add_plugins()- Add a list of plugins to the list of plugins.
T2.remove_plugin()- Remove a plugin from the list of plugins.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.remove_plugins(['arpDecode', 'basicStats']) >>> t2.list_plugins() ['tcpFlags'] def report(self) ‑> str-
Expand source code
def report(self) -> str: """Return Tranalyzer2 `_log.txt` file contents. Returns ------- str The raw contents of the log file. Raises ------ RuntimeError If the `_log.txt` file could not be found. See Also -------- T2.log_file : Return the filename of the log file. T2.log : Alias for `T2.report()` Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.report() '================================================================================...' """ name = self.log_file() if not name or not isfile(name): raise RuntimeError("Run Tranalyzer to create a log file") return self._load_file(name)Return Tranalyzer2
_log.txtfile contents.Returns
str- The raw contents of the log file.
Raises
RuntimeError- If the
_log.txtfile could not be found.
See Also
T2.log_file()- Return the filename of the log file.
T2.log()- Alias for
T2.report()
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> t2.report() '================================================================================...' def reset(self)-
Expand source code
def reset(self): """Reset all plugins' flags to their default value. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'txtSink']) >>> t2.arpDecode.MAX_IP = 5 >>> t2.apply_config() >>> t2.reset() >>> t2.apply_config() """ for plugin in self._plugins.values(): plugin.reset()Reset all plugins' flags to their default value.
Examples
>>> t2 = T2(plugins=['arpDecode', 'txtSink']) >>> t2.arpDecode.MAX_IP = 5 >>> t2.apply_config() >>> t2.reset() >>> t2.apply_config() def run(self,
pcap: str = None,
iface: str = None,
pcap_list: str | List[str] = None,
output_prefix: str = None,
save_monitoring: bool = False,
packet_mode: bool = False,
plugins: List[str] = None,
plugin_folder: str = None,
loading_list: str = None,
bpf: str = None,
rebuild: bool = False,
streaming: bool = False,
timeout: int = None,
verbose: bool = False)-
Expand source code
def run( self, pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, save_monitoring: bool = False, packet_mode: bool = False, plugins: List[str] = None, plugin_folder: str = None, loading_list: str = None, bpf: str = None, rebuild: bool = False, streaming: bool = False, timeout: int = None, verbose: bool = False ): """Run Tranalyzer2. Parameters ---------- pcap : str, default: None Path to a pcap file. iface : str, default: None Name of a network interface. pcap_list : str or list of str, default: None Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'` or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`. output_prefix : str, default: None If `None`, automatically derived from input. save_monitoring: bool, default: False Save the monitoring report in a `_monitoring.txt` file. packet_mode : bool, default: False Activate Tranalyzer2 packet mode. plugin_folder : str, default: None Path to the plugin folder. plugins : list of str, default: None The list of plugins to use. If `None`, use `T2.list_plugins()`. loading_list : str, default: None Path to a plugin loading list. If `None`, create one using `T2.list_plugins()`. bpf : str, default: None A BPF filter. rebuild : bool, default, default: False Apply the current changes and rebuild Tranalyzer2 and the plugins. streaming: bool, default: False If `True`, then `T2.stream()` can be used to yield one flow at a time. timeout : int, default: None Number of seconds after which to terminate the process. If `None`, run forever or until the end of file is reached. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ OSError If specified interface `iface` does not exist locally. RuntimeError - If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified. - If socketSink plugin is not loaded or misconfigured when `T2.streaming` is `True`. subprocess.CalledProcessError If the process exited with a non-zero exit code. subprocess.TimeoutExpired If `timeout` was expired and its value reached. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt') >>> t2.build() >>> t2.run() >>> t2.run(packet_mode=True) """ inputs = (pcap, pcap_list, iface) if sum(bool(i) for i in inputs) > 1: raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified') inputs += (self.pcap, self.pcap_list, self.iface) if not any(bool(i) for i in inputs): raise RuntimeError('An input (pcap, pcap_list or iface) is required') if pcap: self.pcap = pcap self.pcap_list = None self.iface = None elif iface: self.pcap = None self.pcap_list = None self.iface = iface elif pcap_list: self.pcap = None self.pcap_list = pcap_list self.iface = None if output_prefix: self.output_prefix = output_prefix if plugin_folder: self.plugin_folder = plugin_folder if loading_list: self.loading_list = loading_list elif not plugins: plugins = list(self._plugins.keys()) if save_monitoring: self.save_monitoring = save_monitoring if packet_mode: self.packet_mode = packet_mode if bpf: self.bpf = bpf if streaming or self.streaming: if 'socketSink' not in self._plugins: raise RuntimeError('socketSink plugin is required for running in streaming mode') if (self.socketSink.SKS_SOCKTYPE != 1 or self.socketSink.SKS_CONTENT_TYPE != 2 or self.socketSink.SKS_GZ_COMPRESS != 0): raise RuntimeError("Invalid configuration for socketSink in streaming mode: " "requires 'SKS_SOCKTYPE=1', 'SKS_CONTENT_TYPE=2' and " "'SKS_GZ_COMPRESS=0'") self.streaming = True if rebuild: if not self.loading_list: self.apply_changes() self.build() else: self.tranalyzer2.apply_changes() self.tranalyzer2.build() with open(self.loading_list, encoding='utf-8') as f: for plugin in f: T2Utils.apply_config(plugin.strip()) self.build(plugin=self.loading_list) def _run_tranalyzer(self): T2Utils.run_tranalyzer( t2_exec=self.t2_exec, pcap=self.pcap, iface=self.iface, pcap_list=self.pcap_list, output_prefix=self.output_prefix, log_file=True, save_monitoring=self.save_monitoring, packet_mode=self.packet_mode, plugin_folder=self.plugin_folder, bpf=self.bpf, loading_list=self.loading_list, plugins=plugins, timeout=timeout, verbose=verbose) if not self.streaming: _run_tranalyzer(self) else: self._setup_streaming() thread = threading.Thread(target=_run_tranalyzer, args=(self,)) thread.daemon = True # stop thread when run_tranalyzer returns thread.start()Run Tranalyzer2.
Parameters
pcap:str, default: None- Path to a pcap file.
iface:str, default: None- Name of a network interface.
pcap_list:strorlistofstr, default: None- Path to a list of pcap files, e.g.,
'/tmp/myPcaps.txt'or list of path to pcap files, e.g.,['file1.pcap', 'file2.pcap']. output_prefix:str, default: None- If
None, automatically derived from input. save_monitoring:bool, default: False- Save the monitoring report in a
_monitoring.txtfile. packet_mode:bool, default: False- Activate Tranalyzer2 packet mode.
plugin_folder:str, default: None- Path to the plugin folder.
plugins:listofstr, default: None-
The list of plugins to use.
If
None, useT2.list_plugins(). loading_list:str, default: None-
Path to a plugin loading list.
If
None, create one usingT2.list_plugins(). bpf:str, default: None- A BPF filter.
rebuild:bool, default, default: False- Apply the current changes and rebuild Tranalyzer2 and the plugins.
streaming:bool, default: False- If
True, thenT2.stream()can be used to yield one flow at a time. timeout:int, default: None-
Number of seconds after which to terminate the process.
If
None, run forever or until the end of file is reached. verbose:bool, default: False-
- If
True, print the output (stdoutandstderr) of the command. - If
False, do not print anything.
- If
Raises
OSError- If specified interface
ifacedoes not exist locally. RuntimeError-
- If none or more than one input (
pcap,pcap_list,iface) is specified. - If socketSink plugin is not loaded or misconfigured when
T2.streamingisTrue.
- If none or more than one input (
subprocess.CalledProcessError- If the process exited with a non-zero exit code.
subprocess.TimeoutExpired- If
timeoutwas expired and its value reached.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format='txt') >>> t2.build() >>> t2.run() >>> t2.run(packet_mode=True) def set_plugins(self, plugins: List[str] = None)-
Expand source code
def set_plugins( self, plugins: List[str] = None ): """Overwrite the current list of plugins. Parameters ---------- plugins : list of str, default: None The list of plugins to use. If `None`, empty the list of plugins. See Also -------- T2.list_plugins : Alphabetically list the currently active plugins. Examples -------- >>> t2 = T2() >>> t2.set_plugins(['arpDecode', 'basicStats']) >>> t2.list_plugins() ['arpDecode', 'basicStats'] >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.basicStats.description 'Basic statistics' """ self.clear_plugins() if plugins: self._plugins = {plugin: T2Plugin(plugin) for plugin in plugins} self._add_dict_entries(plugins)Overwrite the current list of plugins.
Parameters
plugins:listofstr, default: None-
The list of plugins to use.
If
None, empty the list of plugins.
See Also
T2.list_plugins()- Alphabetically list the currently active plugins.
Examples
>>> t2 = T2() >>> t2.set_plugins(['arpDecode', 'basicStats']) >>> t2.list_plugins() ['arpDecode', 'basicStats'] >>> type(t2.arpDecode) <class 't2py.T2Plugin.T2Plugin'> >>> t2.basicStats.description 'Basic statistics' def status(self)-
Expand source code
def status(self): """Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers). Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz') >>> t2.status() """ bold = '\033[1m' blue = '\033[0;34m' orange = '\033[0;33m' reset = '\033[0m' def _pending_changes(): if pending_changes == 0: return '' return (' ' f'{blue}' f'[{pending_changes} change{"s" if pending_changes > 1 else ""} pending]' f'{reset}') def _loading_list_status(): if self.loading_list and plugin_name not in self._whitelisted_plugins: return f' {orange}[NOT IN LOADING LIST]{reset}' return '' if self._tranalyzer2: t2 = self.tranalyzer2 pending_changes = len(t2.changes) if pending_changes > 0: print(f'{bold}Tranalyzer2{reset}', end='') print(f'{blue}{_pending_changes()}{reset}:') for name, value in t2.changes.items(): print(f' {name} = {value}') print() print(bold, 'Plugins:', reset) num_plugins = 0 for plugin_name in sorted(self._plugins, key=lambda name: self._plugins[name].number): plugin = self._plugins[plugin_name] pending_changes = len(plugin.changes) print(f' {num_plugins + 1}: {plugin_name}' f'{_pending_changes()}' f'{_loading_list_status()}') if pending_changes > 0: for name, value in plugin.changes.items(): print(f' {name} = {value}') num_plugins += 1 if num_plugins == 0: print(' None') print(f'\n{bold}Tranalyzer options:{reset}') if self.pcap: print(f' -r {self.pcap}') elif self.iface: print(f' -i {self.iface}') elif self.pcap_list: if isinstance(self.pcap_list, list): print(' -R /tmp/pcap_list.txt') else: print(f' -R {self.pcap_list}') if self.output_prefix: print(f' -w {self.output_prefix}') if self.loading_list: print(f' -b {self.loading_list}') if self.plugin_folder: print(f' -p {self.plugin_folder}') if self.save_monitoring: print(' -m') if self.packet_mode: print(' -s') print(' -l') if self.bpf: print(f' BPF filter: {self.bpf}')Print the current configuration status of Tranalyzer2 and the plugins (sorted by plugin numbers).
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', output_format='csv.gz') >>> t2.status() def stream(self) ‑> Iterator[Dict[str, Any]]-
Expand source code
def stream(self) -> Iterator[Dict[str, Any]]: """Iterator returning flow by flow. Raises ------ RuntimeError - If the function was called, without configuring streaming mode first. - If socketSink did not open connection. Yields ------ dict The next flow represented as a dict with the column names as key. Examples -------- >>> t2 = T2(streaming=True, plugins=['basicFlow']) >>> t2.run(pcap='file.pcap') >>> for flow in t2.stream(): ... print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}") """ if not self.streaming: raise RuntimeError("Calling 'stream' without configuring streaming mode first.") if not self._conn: time.sleep(2) # wait for tranalyzer to start and socketSink to connect if not self._conn: raise RuntimeError('socketSink did not open connection') buf = b'' while True: buf += self._conn.recv(1024) if not buf: break lines = buf.split(b'\n') for line in lines[:-1]: yield json.loads(line) buf = lines[-1]Iterator returning flow by flow.
Raises
RuntimeError-
- If the function was called, without configuring streaming mode first.
- If socketSink did not open connection.
Yields
dict- The next flow represented as a dict with the column names as key.
Examples
>>> t2 = T2(streaming=True, plugins=['basicFlow']) >>> t2.run(pcap='file.pcap') >>> for flow in t2.stream(): ... print(f"{flow['srcIP']}:{flow['srcPort']} -> {flow['dstIP']}:{flow['dstPort']}") def to_pandas(self, infile: str = None, delimiter: str = None) ‑> pandas.core.frame.DataFrame-
Expand source code
def to_pandas( self, infile: str = None, delimiter: str = None ) -> "pandas.core.frame.DataFrame": """Convert the flow file or `infile` to pandas `DataFrame`. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: None Field delimiter used in the input file. Returns ------- pd.DataFrame DataFrame holding the tabular data stored in `infile`. Raises ------ RuntimeError If `infile` could not be found. Examples -------- >>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> df = t2.to_pandas() >>> type(df) <class 'pandas.core.frame.DataFrame'> """ infile = infile if infile else self.flow_file() if not infile: raise RuntimeError('Run Tranalyzer with the txtSink or jsonSink plugin to create ' 'a flow file') return T2Utils.to_pandas(infile, delimiter)Convert the flow file or
infileto pandasDataFrame.Parameters
infile:str- Path to a flow or packet file.
delimiter:str, default: None- Field delimiter used in the input file.
Returns
pd.DataFrame- DataFrame holding the tabular data stored in
infile.
Raises
RuntimeError- If
infilecould not be found.
Examples
>>> t2 = T2(pcap='/tmp/file.pcap', plugins=['basicFlow', 'txtSink']) >>> t2.build() >>> t2.run() >>> df = t2.to_pandas() >>> type(df) <class 'pandas.core.frame.DataFrame'> def unload(self,
plugin: str | List[str] = None,
plugin_folder: str = None,
verbose: bool = False)-
Expand source code
def unload( self, plugin: Union[str, List[str]] = None, plugin_folder: str = None, verbose: bool = False ): """Remove (unload) plugin(s) from the plugin folder. Parameters ---------- plugin : str or list of str, default: None `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins - If `None`, use `T2.list_plugins()`. plugin_folder : str, default: None The plugin folder from where the plugin must be unloaded. If `None`, use `T2.plugin_folder`. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Examples -------- >>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() >>> t2.unload() """ plugin_folder = plugin_folder if plugin_folder else self.plugin_folder to_unload = plugin if plugin else list(self._plugins.keys()) T2Utils.unload(to_unload, plugin_folder, verbose=verbose)Remove (unload) plugin(s) from the plugin folder.
Parameters
plugin:strorlistofstr, default: None-
plugincan be:- a single plugin, e.g.,
'pluginName' - a list of plugins, e.g.,
['pluginName1', 'pluginName2'] - a plugin loading list, e.g.,
'myPlugins.txt' allto apply the operation to all available plugins- If
None, useT2.list_plugins().
- a single plugin, e.g.,
plugin_folder:str, default: None-
The plugin folder from where the plugin must be unloaded.
If
None, useT2.plugin_folder. verbose:bool, default: False-
- If
True, print the output (stdoutandstderr) of the command. - If
False, do not print anything.
- If
Raises
subprocess.CalledProcessError- If the process exited with a non-zero exit code.
Examples
>>> t2 = T2(plugins=['arpDecode', 'basicStats', 'tcpFlags']) >>> t2.build() >>> t2.unload()