Source code for cypresspoint.spath

import re
from typing import Any, List, Tuple


[docs] def sanitize_fieldname(field: str) -> str: """ Remove unwanted characters from the provided field name. The goal is to mimic the general field cleanup behavior of Splunk """ clean = re.sub(r'[^A-Za-z0-9_.{}\[\]-]', "_", field) # Remove leading/trailing underscores clean = clean.strip("_") return clean
def _dict_to_splunk_fields(obj: Any, prefix: Tuple[str] = () ) -> List[Tuple[Tuple[str, str], str]]: """ Input: Object (dict, list, str/int/float) Output: [ ( (name,name), value) ] Convention: Arrays suffixed with "{}" """ output = [] if isinstance(obj, dict): for key, value in obj.items(): key = sanitize_fieldname(key) output.extend(_dict_to_splunk_fields(value, prefix=prefix+(key,))) elif isinstance(obj, (list)): if prefix: prefix = prefix[:-1] + (prefix[-1] + "{}",) for item in obj: output.extend(_dict_to_splunk_fields(item, prefix=prefix)) elif isinstance(obj, bool): output.append((prefix, "true" if obj else "false")) elif isinstance(obj, (str, int, float)) or obj is None: output.append((prefix, obj)) else: raise TypeError("Unsupported datatype {}".format(type(obj))) return output
[docs] def splunk_dot_notation(obj: dict) -> dict: """ Convert json object (python dictionary) into a list of fields as Splunk does by default. Think of this as the same as calling Splunk's "spath" SPL command. """ d = {} if not isinstance(obj, dict): raise ValueError("Expected obj to be a dictionary, received {}" .format(type(obj))) for field_pair, value in _dict_to_splunk_fields(obj): field_name = ".".join(field_pair) if field_name in d: if not isinstance(d[field_name], list): d[field_name] = [d[field_name]] d[field_name].append(value) else: d[field_name] = value return d