Generic Helpers
Source code in linecaspy/helpers/generic.py
class HelperFunctions:
def __init__(self):
pass
def check_if_directory(self, path):
"""Checks if a dir exists at given location
Args:
path (string): Of directory location
Returns:
bool: if directory exists
"""
path = os.path.abspath(path)
if os.path.isdir(path):
return True
else:
log.error(f"Error checking: {path}")
return False
def check_if_file(self, path):
"""Checks if a file exists at given location
Args:
path (string): Of file location
Returns:
bool: if file exists
"""
path = os.path.abspath(path)
if os.path.isfile(path):
return True
else:
log.error(f"Error checking: {path}")
return False
def check_if_ip_address(self, input):
"""Checks if IP address is valid (works for IPv4 and IPv6)
Args:
input (string): Of string to check
Returns:
bool: if input is a valid IP address
"""
try:
ip_address(input)
except ValueError:
log.error(f"Error checking: {input}")
return False
return True
def check_if_json(self, input):
"""Checks if input is valid JSON
Args:
input (string): Of string to check
Returns:
bool: if is valid JSON
"""
try:
json.loads(input)
except ValueError:
return False
except TypeError:
return False
return True
def debug_requests_results(self, r):
"""Adds debug logging of Requests object
Args:
r (requests.models.Response): The python requests object
"""
log.info(f"Response Status Code: {r.status_code}")
log.debug(f"Response Execution Time: {r.elapsed.total_seconds()}")
log.debug(f"Response Headers: {r.headers}")
try:
log.debug(f"Response Content: {json.dumps(r.json(), indent=4)}")
except Exception as e:
log.debug(f"Response Text: {r.text}")
log.debug(f"Exception: {e}")
def enable_debugger(self):
"""Enables Python debugger if environment var DEBUGGER=True
Returns:
bool: of environment var DEBUGGER
"""
debugger = self.get_env_var(key="DEBUGGER", default="False")
if debugger == "True":
debugpy.listen(("0.0.0.0", 5678))
log.info("Waiting for debugger to attach")
debugpy.wait_for_client()
# debugpy.breakpoint()
return True
else:
log.info("Debugging not enabled, nothing to do")
log.info("If desired, set: export DEBUGGER=True")
return False
def get_current_time_stamp(self):
"""Gets current date-time and returns in standard form
Returns:
string: of current date-time
"""
return datetime.now().strftime("%Y%m%d%H%M%S")
def get_current_time_epoch(self):
"""Gets current epoch timestamp
Returns:
int: of current epoch date-time
"""
return int(datetime.now().timestamp())
def get_env_var(self, key, default=None, raise_error=None):
"""Handles getting/defaults/raising-errors of environment variables
Args:
key (string): Name of environment variable to get
default (string): Allows setting of default value otherwise None
raise_error (bool): returns a ValueError exception if desired otherwise None
Returns:
string: of the Environment Variable
"""
log.debug(f"Getting environment variable: {key}")
value = os.getenv(key, default)
if value is None or value == "":
if raise_error:
raise ValueError(f"{key} not set or has no value")
return default
return value
def get_linecas_config(self):
"""Read in LineCas config YAML
Returns:
dict: of config
"""
with open(C.LINECAS_CONFIG_LOCATION) as f:
config = yaml.safe_load(f)
return config
def get_linecaspy_version(self):
"""Gets package version"""
print("\n")
print(f"LinecasPy {__version__}")
print(f" executable location = {sys.argv[0]}")
print(f" python platform version = {platform.python_version()}")
print(f" python sys version = {''.join(sys.version.splitlines())}")
print("\n\n")
def gzip_file(self, outfile):
"""Performs gzip on a file
Args:
outfile (string): file path
"""
log.info(f"Performing gzip on file: {outfile}")
subprocess.run(["gzip", "-9", "--rsyncable", outfile], check=True)
def lambda_log_event_context(self, event, context):
"""Logs information on invoked AWS Lambda function"""
if event:
log.info(f"EVENT: event is of type: {type(event)} and data: {event}")
if context:
log.info(f"CONTEXT: context is of type: {type(context)} and data: {context}")
log.info(f"CONTEXT: Client context: {context.client_context}")
log.info(f"CONTEXT: CloudWatch log group name: {context.log_group_name}")
log.info(f"CONTEXT: CloudWatch log stream name: {context.log_stream_name}")
log.info(f"CONTEXT: Function ARN: {context.invoked_function_arn}")
log.info(f"CONTEXT: Function memory limits in MB: {context.memory_limit_in_mb}")
log.info(f"CONTEXT: Function name: {context.function_name}")
log.info(f"CONTEXT: Function version: {context.function_version}")
log.info(f"CONTEXT: Request ID: {context.aws_request_id}")
def port_check(self, dest_host, dest_port):
"""Checks if remote host passes a basic port test
Args:
dest_host (string): name/ip to connect to
dest_port (int): tcp port number
Returns:
bool: of result
"""
log.debug(f"HOST: {dest_host}, PORT: {dest_port}")
result = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((dest_host, int(dest_port)))
log.debug(f"RESULT: {result}")
except socket.gaierror:
log.error(f"DNS Name or Service connection failure for {dest_host}")
if result == 0:
log.info(f"Port check succeeded to {dest_host} on TCP {dest_port}")
return True
else:
log.error(f"Port check failed to {dest_host} on TCP {dest_port}")
return False
def pretty_table_create(self, fields, **kwargs):
"""Sets up Pretty Table object
Args:
fields (list): of strings for columns
**kwargs: column alignments
Returns:
prettytable.prettytable.PrettyTable: of table object
"""
pt = PrettyTable(fields)
for key, value in kwargs.items():
log.debug(f"key: {key}, value: {value}")
pt.align[key] = value
return pt
def pretty_table_print(self, pt, sort_key=None):
"""Sorts and prints the Pretty Table object
Args:
pt (prettytable.prettytable.PrettyTable): of table object
sort_key (string): of strings for columns
"""
if sort_key:
pt.sortby = sort_key
print(pt)
def requests_get_url(self, url, headers, expected_rc=200, verify=True, timeout=5):
"""Gets URL and returns request object if status code matches
Args:
url (string): of URL
headers (dict): of HTTP Headers
expected_rc (int): of HTTP Status Code expected to be returned
verify (bool): if SSL verification should be performed
timeout (int): seconds of response time to wait
Returns:
requests.Response: of response otherwise None
"""
r = requests.get(url=url, headers=headers, verify=verify, timeout=timeout)
log.debug(f"Response status_code: {r.status_code}")
if r.status_code != expected_rc:
log.error(f"Expected {expected_rc} status code. Received: {r.status_code}")
log.error(f"Reponse text: {r.text}")
return None
else:
return r
def requests_put_url(self, url, headers, data, expected_rc=204, verify=True, timeout=5):
"""Gets URL and returns request object if status code matches
Args:
url (string): of URL
headers (dict): of HTTP Headers
data (dict): of JSON payload
expected_rc (int): of HTTP Status Code expected to be returned
verify (bool): if SSL verification should be performed
timeout (int): seconds of response time to wait
Returns:
requests.Response: of response otherwise None
"""
r = requests.put(url=url, headers=headers, data=json.dumps(data), verify=verify, timeout=timeout)
log.debug(f"Response status_code: {r.status_code}")
if r.status_code != expected_rc:
log.error(f"Expected {expected_rc} status code. Received: {r.status_code}")
log.error(f"Reponse text: {r.text}")
return None
else:
return r
def requests_return_json_response(self, r, debug=False):
"""Analyzes if the text of a Requests call returned a JSON response
Args:
r (requests.Response): of requests response
Returns:
dict: of JSON response, otherwise None
"""
if self.check_if_json(r.text):
if debug:
pprint(r.json())
return r.json()
else:
log.warning(f"Invalid JSON with: {r.text}")
return None
def return_none_or_sys_exit(self, sys_exit=False, exit_code=1):
"""Error handling function to either return None or perform a sys.exit() call
Args:
sys_exit (bool): if the process should forcibly close
exit_code (int): the return code of the process
Returns:
None: None or a sys.exit
"""
if sys_exit:
sys.exit(exit_code)
else:
return None
def run_command(self, command, expected_rc=[0]):
"""Run command, check return code, and return output
Args:
command (string): of the command to run
expected_rc (list): of ints of accepted command return codes (only returned if multiple specified)
Returns:
string: of command output
int: of return code
"""
log.debug(f"Running: {command}")
command_list = shlex.split(command)
try:
result = subprocess.run(
command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False
)
rc = result.returncode
output = result.stdout.strip() + result.stderr.strip()
except Exception as e:
raise Exception(f"Failed to execute command: {command}. Error: {e}")
if rc not in expected_rc:
raise Exception(f"Command: {command} returned: {rc} with output: {output}")
if len(expected_rc) == 1:
return output
else:
return output, rc
check_if_directory(self, path)
Checks if a dir exists at given location
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
string |
Of directory location |
required |
Returns:
| Type | Description |
|---|---|
bool |
if directory exists |
Source code in linecaspy/helpers/generic.py
def check_if_directory(self, path):
"""Checks if a dir exists at given location
Args:
path (string): Of directory location
Returns:
bool: if directory exists
"""
path = os.path.abspath(path)
if os.path.isdir(path):
return True
else:
log.error(f"Error checking: {path}")
return False
check_if_file(self, path)
Checks if a file exists at given location
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
string |
Of file location |
required |
Returns:
| Type | Description |
|---|---|
bool |
if file exists |
Source code in linecaspy/helpers/generic.py
def check_if_file(self, path):
"""Checks if a file exists at given location
Args:
path (string): Of file location
Returns:
bool: if file exists
"""
path = os.path.abspath(path)
if os.path.isfile(path):
return True
else:
log.error(f"Error checking: {path}")
return False
check_if_ip_address(self, input)
Checks if IP address is valid (works for IPv4 and IPv6)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input |
string |
Of string to check |
required |
Returns:
| Type | Description |
|---|---|
bool |
if input is a valid IP address |
Source code in linecaspy/helpers/generic.py
def check_if_ip_address(self, input):
"""Checks if IP address is valid (works for IPv4 and IPv6)
Args:
input (string): Of string to check
Returns:
bool: if input is a valid IP address
"""
try:
ip_address(input)
except ValueError:
log.error(f"Error checking: {input}")
return False
return True
check_if_json(self, input)
Checks if input is valid JSON
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input |
string |
Of string to check |
required |
Returns:
| Type | Description |
|---|---|
bool |
if is valid JSON |
Source code in linecaspy/helpers/generic.py
def check_if_json(self, input):
"""Checks if input is valid JSON
Args:
input (string): Of string to check
Returns:
bool: if is valid JSON
"""
try:
json.loads(input)
except ValueError:
return False
except TypeError:
return False
return True
debug_requests_results(self, r)
Adds debug logging of Requests object
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
r |
requests.models.Response |
The python requests object |
required |
Source code in linecaspy/helpers/generic.py
def debug_requests_results(self, r):
"""Adds debug logging of Requests object
Args:
r (requests.models.Response): The python requests object
"""
log.info(f"Response Status Code: {r.status_code}")
log.debug(f"Response Execution Time: {r.elapsed.total_seconds()}")
log.debug(f"Response Headers: {r.headers}")
try:
log.debug(f"Response Content: {json.dumps(r.json(), indent=4)}")
except Exception as e:
log.debug(f"Response Text: {r.text}")
log.debug(f"Exception: {e}")
enable_debugger(self)
Enables Python debugger if environment var DEBUGGER=True
Returns:
| Type | Description |
|---|---|
bool |
of environment var DEBUGGER |
Source code in linecaspy/helpers/generic.py
def enable_debugger(self):
"""Enables Python debugger if environment var DEBUGGER=True
Returns:
bool: of environment var DEBUGGER
"""
debugger = self.get_env_var(key="DEBUGGER", default="False")
if debugger == "True":
debugpy.listen(("0.0.0.0", 5678))
log.info("Waiting for debugger to attach")
debugpy.wait_for_client()
# debugpy.breakpoint()
return True
else:
log.info("Debugging not enabled, nothing to do")
log.info("If desired, set: export DEBUGGER=True")
return False
get_current_time_epoch(self)
Gets current epoch timestamp
Returns:
| Type | Description |
|---|---|
int |
of current epoch date-time |
Source code in linecaspy/helpers/generic.py
def get_current_time_epoch(self):
"""Gets current epoch timestamp
Returns:
int: of current epoch date-time
"""
return int(datetime.now().timestamp())
get_current_time_stamp(self)
Gets current date-time and returns in standard form
Returns:
| Type | Description |
|---|---|
string |
of current date-time |
Source code in linecaspy/helpers/generic.py
def get_current_time_stamp(self):
"""Gets current date-time and returns in standard form
Returns:
string: of current date-time
"""
return datetime.now().strftime("%Y%m%d%H%M%S")
get_env_var(self, key, default=None, raise_error=None)
Handles getting/defaults/raising-errors of environment variables
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key |
string |
Name of environment variable to get |
required |
default |
string |
Allows setting of default value otherwise None |
None |
raise_error |
bool |
returns a ValueError exception if desired otherwise None |
None |
Returns:
| Type | Description |
|---|---|
string |
of the Environment Variable |
Source code in linecaspy/helpers/generic.py
def get_env_var(self, key, default=None, raise_error=None):
"""Handles getting/defaults/raising-errors of environment variables
Args:
key (string): Name of environment variable to get
default (string): Allows setting of default value otherwise None
raise_error (bool): returns a ValueError exception if desired otherwise None
Returns:
string: of the Environment Variable
"""
log.debug(f"Getting environment variable: {key}")
value = os.getenv(key, default)
if value is None or value == "":
if raise_error:
raise ValueError(f"{key} not set or has no value")
return default
return value
get_linecas_config(self)
Read in LineCas config YAML
Returns:
| Type | Description |
|---|---|
dict |
of config |
Source code in linecaspy/helpers/generic.py
def get_linecas_config(self):
"""Read in LineCas config YAML
Returns:
dict: of config
"""
with open(C.LINECAS_CONFIG_LOCATION) as f:
config = yaml.safe_load(f)
return config
get_linecaspy_version(self)
Gets package version
Source code in linecaspy/helpers/generic.py
def get_linecaspy_version(self):
"""Gets package version"""
print("\n")
print(f"LinecasPy {__version__}")
print(f" executable location = {sys.argv[0]}")
print(f" python platform version = {platform.python_version()}")
print(f" python sys version = {''.join(sys.version.splitlines())}")
print("\n\n")
gzip_file(self, outfile)
Performs gzip on a file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outfile |
string |
file path |
required |
Source code in linecaspy/helpers/generic.py
def gzip_file(self, outfile):
"""Performs gzip on a file
Args:
outfile (string): file path
"""
log.info(f"Performing gzip on file: {outfile}")
subprocess.run(["gzip", "-9", "--rsyncable", outfile], check=True)
lambda_log_event_context(self, event, context)
Logs information on invoked AWS Lambda function
Source code in linecaspy/helpers/generic.py
def lambda_log_event_context(self, event, context):
"""Logs information on invoked AWS Lambda function"""
if event:
log.info(f"EVENT: event is of type: {type(event)} and data: {event}")
if context:
log.info(f"CONTEXT: context is of type: {type(context)} and data: {context}")
log.info(f"CONTEXT: Client context: {context.client_context}")
log.info(f"CONTEXT: CloudWatch log group name: {context.log_group_name}")
log.info(f"CONTEXT: CloudWatch log stream name: {context.log_stream_name}")
log.info(f"CONTEXT: Function ARN: {context.invoked_function_arn}")
log.info(f"CONTEXT: Function memory limits in MB: {context.memory_limit_in_mb}")
log.info(f"CONTEXT: Function name: {context.function_name}")
log.info(f"CONTEXT: Function version: {context.function_version}")
log.info(f"CONTEXT: Request ID: {context.aws_request_id}")
port_check(self, dest_host, dest_port)
Checks if remote host passes a basic port test
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dest_host |
string |
name/ip to connect to |
required |
dest_port |
int |
tcp port number |
required |
Returns:
| Type | Description |
|---|---|
bool |
of result |
Source code in linecaspy/helpers/generic.py
def port_check(self, dest_host, dest_port):
"""Checks if remote host passes a basic port test
Args:
dest_host (string): name/ip to connect to
dest_port (int): tcp port number
Returns:
bool: of result
"""
log.debug(f"HOST: {dest_host}, PORT: {dest_port}")
result = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((dest_host, int(dest_port)))
log.debug(f"RESULT: {result}")
except socket.gaierror:
log.error(f"DNS Name or Service connection failure for {dest_host}")
if result == 0:
log.info(f"Port check succeeded to {dest_host} on TCP {dest_port}")
return True
else:
log.error(f"Port check failed to {dest_host} on TCP {dest_port}")
return False
pretty_table_create(self, fields, **kwargs)
Sets up Pretty Table object
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fields |
list |
of strings for columns |
required |
**kwargs |
column alignments |
{} |
Returns:
| Type | Description |
|---|---|
prettytable.prettytable.PrettyTable |
of table object |
Source code in linecaspy/helpers/generic.py
def pretty_table_create(self, fields, **kwargs):
"""Sets up Pretty Table object
Args:
fields (list): of strings for columns
**kwargs: column alignments
Returns:
prettytable.prettytable.PrettyTable: of table object
"""
pt = PrettyTable(fields)
for key, value in kwargs.items():
log.debug(f"key: {key}, value: {value}")
pt.align[key] = value
return pt
pretty_table_print(self, pt, sort_key=None)
Sorts and prints the Pretty Table object
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pt |
prettytable.prettytable.PrettyTable |
of table object |
required |
sort_key |
string |
of strings for columns |
None |
Source code in linecaspy/helpers/generic.py
def pretty_table_print(self, pt, sort_key=None):
"""Sorts and prints the Pretty Table object
Args:
pt (prettytable.prettytable.PrettyTable): of table object
sort_key (string): of strings for columns
"""
if sort_key:
pt.sortby = sort_key
print(pt)
requests_get_url(self, url, headers, expected_rc=200, verify=True, timeout=5)
Gets URL and returns request object if status code matches
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
string |
of URL |
required |
headers |
dict |
of HTTP Headers |
required |
expected_rc |
int |
of HTTP Status Code expected to be returned |
200 |
verify |
bool |
if SSL verification should be performed |
True |
timeout |
int |
seconds of response time to wait |
5 |
Returns:
| Type | Description |
|---|---|
requests.Response |
of response otherwise None |
Source code in linecaspy/helpers/generic.py
def requests_get_url(self, url, headers, expected_rc=200, verify=True, timeout=5):
"""Gets URL and returns request object if status code matches
Args:
url (string): of URL
headers (dict): of HTTP Headers
expected_rc (int): of HTTP Status Code expected to be returned
verify (bool): if SSL verification should be performed
timeout (int): seconds of response time to wait
Returns:
requests.Response: of response otherwise None
"""
r = requests.get(url=url, headers=headers, verify=verify, timeout=timeout)
log.debug(f"Response status_code: {r.status_code}")
if r.status_code != expected_rc:
log.error(f"Expected {expected_rc} status code. Received: {r.status_code}")
log.error(f"Reponse text: {r.text}")
return None
else:
return r
requests_put_url(self, url, headers, data, expected_rc=204, verify=True, timeout=5)
Gets URL and returns request object if status code matches
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
string |
of URL |
required |
headers |
dict |
of HTTP Headers |
required |
data |
dict |
of JSON payload |
required |
expected_rc |
int |
of HTTP Status Code expected to be returned |
204 |
verify |
bool |
if SSL verification should be performed |
True |
timeout |
int |
seconds of response time to wait |
5 |
Returns:
| Type | Description |
|---|---|
requests.Response |
of response otherwise None |
Source code in linecaspy/helpers/generic.py
def requests_put_url(self, url, headers, data, expected_rc=204, verify=True, timeout=5):
"""Gets URL and returns request object if status code matches
Args:
url (string): of URL
headers (dict): of HTTP Headers
data (dict): of JSON payload
expected_rc (int): of HTTP Status Code expected to be returned
verify (bool): if SSL verification should be performed
timeout (int): seconds of response time to wait
Returns:
requests.Response: of response otherwise None
"""
r = requests.put(url=url, headers=headers, data=json.dumps(data), verify=verify, timeout=timeout)
log.debug(f"Response status_code: {r.status_code}")
if r.status_code != expected_rc:
log.error(f"Expected {expected_rc} status code. Received: {r.status_code}")
log.error(f"Reponse text: {r.text}")
return None
else:
return r
requests_return_json_response(self, r, debug=False)
Analyzes if the text of a Requests call returned a JSON response
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
r |
requests.Response |
of requests response |
required |
Returns:
| Type | Description |
|---|---|
dict |
of JSON response, otherwise None |
Source code in linecaspy/helpers/generic.py
def requests_return_json_response(self, r, debug=False):
"""Analyzes if the text of a Requests call returned a JSON response
Args:
r (requests.Response): of requests response
Returns:
dict: of JSON response, otherwise None
"""
if self.check_if_json(r.text):
if debug:
pprint(r.json())
return r.json()
else:
log.warning(f"Invalid JSON with: {r.text}")
return None
return_none_or_sys_exit(self, sys_exit=False, exit_code=1)
Error handling function to either return None or perform a sys.exit() call
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sys_exit |
bool |
if the process should forcibly close |
False |
exit_code |
int |
the return code of the process |
1 |
Returns:
| Type | Description |
|---|---|
None |
None or a sys.exit |
Source code in linecaspy/helpers/generic.py
def return_none_or_sys_exit(self, sys_exit=False, exit_code=1):
"""Error handling function to either return None or perform a sys.exit() call
Args:
sys_exit (bool): if the process should forcibly close
exit_code (int): the return code of the process
Returns:
None: None or a sys.exit
"""
if sys_exit:
sys.exit(exit_code)
else:
return None
run_command(self, command, expected_rc=[0])
Run command, check return code, and return output
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
command |
string |
of the command to run |
required |
expected_rc |
list |
of ints of accepted command return codes (only returned if multiple specified) |
[0] |
Returns:
| Type | Description |
|---|---|
string |
of command output int: of return code |
Source code in linecaspy/helpers/generic.py
def run_command(self, command, expected_rc=[0]):
"""Run command, check return code, and return output
Args:
command (string): of the command to run
expected_rc (list): of ints of accepted command return codes (only returned if multiple specified)
Returns:
string: of command output
int: of return code
"""
log.debug(f"Running: {command}")
command_list = shlex.split(command)
try:
result = subprocess.run(
command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False
)
rc = result.returncode
output = result.stdout.strip() + result.stderr.strip()
except Exception as e:
raise Exception(f"Failed to execute command: {command}. Error: {e}")
if rc not in expected_rc:
raise Exception(f"Command: {command} returned: {rc} with output: {output}")
if len(expected_rc) == 1:
return output
else:
return output, rc