AWS
Sample Commands
on-aws --azs
on-aws --billing
on-aws --delete-record xyz
on-aws --dns
on-aws --eks
on-aws --instances
Source code in linecaspy/aws/aws.py
class AwsClass:
def __init__(self, domain="linecas.com", debug=False):
"""Sets up the clients and DNS information for interaction with AWS
Args:
domain (string): DNS zone to use. If None, default is linecas.com
debug (bool): Used for pprint and log debugging
"""
self.debug = debug
self.ami_image_cache = []
logger.info(f"Intializing the AWS Class using Domain: {domain} with debug: {debug}")
# Clients
self._as = boto3.client("autoscaling")
self._ce = boto3.client("ce")
self._ec2 = boto3.client("ec2")
self._eks = boto3.client("eks")
self._elb_v1 = boto3.client("elb") # NOTE: not elbv2
self._elb = boto3.client("elbv2") # NOTE: not elb
self._r53 = boto3.client("route53")
self._tags = boto3.client("resourcegroupstaggingapi")
# DNS
self.dns_zone_name = domain
self.dns_zone_id = None
self.dns_ttl = 90
def ami_get_friendly_name(self, ami_id, words=3):
"""Gets a shortened name of Image used
Args:
ami_id (string): ID of the AMI
words (int): of the first X amount of strings in name
Returns:
string: a human comprehensible name
"""
# first check if we already looked up this AMI ID
for ami in self.ami_image_cache:
for key, value in ami.items():
if key == ami_id:
friendly_name = value
logger.debug(f"Found {key} with {value}")
return friendly_name
# if not found, then we'll make an API call to discover
response = self._ec2.describe_images(ImageIds=[ami_id])
logger.debug(f"RESPONSE: {response}")
friendly_name = None
images = response.get("Images")
for image in images:
image_name = image.get("Name")
logger.debug(f"IMAGE_NAME: {image_name}")
friendly_name = " ".join(image_name.split()[: int(words)])
# and add that discovery to the image cache
image_dict = {ami_id: friendly_name}
self.ami_image_cache.append(image_dict)
return friendly_name
def autoscaling_get_groups(self, asg_name=None):
"""Collects information on all autoscaling groups
Args:
asg_name (string): name of specific Autoscaling Group
Returns:
list: of dictionaries on autoscaling groups
"""
if asg_name:
response = self._as.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name])
else:
response = self._as.describe_auto_scaling_groups()
if self.debug:
logger.debug("get response:")
pprint(response.get("AutoScalingGroups"))
logger.info(f"There is {len(response.get('AutoScalingGroups'))} Autoscaling-groups(s) in search query")
return response.get("AutoScalingGroups")
def autoscaling_filter_groups(self):
"""Filters on specific subset of information in autoscaling groups
Returns:
list: of dictionaries on autoscaling groups
"""
all_asgs = []
asgs = self.autoscaling_get_groups()
for asg in asgs:
i = {}
i["asg_arn"] = asg.get("AutoScalingGroupARN")
i["asg_name"] = asg.get("AutoScalingGroupName")
i["asg_size_current"] = len(asg.get("Instances"))
i["asg_size_max"] = asg.get("MaxSize")
i["asg_size_min"] = asg.get("MinSize")
try:
# NOTE: assumes ASG will have single target
i["asg_target_group"] = asg.get("TargetGroupARNs")[0]
except IndexError:
i["asg_target_group"] = None
all_asgs.append(i)
if self.debug:
logger.debug("filtered response:")
pprint(all_asgs)
return all_asgs
def dns_delete_records(self, name, type_r, ttl, resources):
"""Performs the deletion of DNS record
Args:
name (string): Name of record
type_r (string): Type of record like A or CNAME
ttl (string): Time To Live value
resources (list): Destination ResourceRecords
"""
logger.info(f"Received deletion request for {name}")
response = self._r53.change_resource_record_sets(
ChangeBatch={
"Changes": [
{
"Action": "DELETE",
"ResourceRecordSet": {
"Name": name,
"ResourceRecords": resources,
"Type": type_r,
"TTL": ttl,
},
},
]
},
HostedZoneId=self.dns_zone_id,
)
if self.debug:
pprint(response)
logger.info(f"RESPONSE: {response}")
def dns_upsert_record(self, source, dest, r_type="CNAME"):
"""Creates or updates DNS record
Args:
source (string): DNS value you want to map
dest (string): DNS value you want to map to
r_type (string): DNS record type, typically 'A' or 'CNAME'
Returns:
dict: response of DNS event
"""
if self.dns_zone_id is None:
self.dns_zone_id = self.dns_zones(self.dns_zone_name)
response = self._r53.change_resource_record_sets(
HostedZoneId=self.dns_zone_id,
ChangeBatch={
"Comment": f"add {source} -> {dest}",
"Changes": [
{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": source,
"Type": r_type,
"TTL": self.dns_ttl,
"ResourceRecords": [{"Value": dest}],
},
}
],
},
)
if self.debug:
pprint(response)
logger.info(f"RESPONSE: {response}")
return response
def dns_delete_record_function(self, name):
"""Handler that calls the "dns_zone_records" and "dns_delete_records" functions
Args:
name (string): name of DNS record, will be expanded to FQDN
"""
fqdn = f"{name}.{self.dns_zone_name}."
logger.info(f"Looking for record with name: {fqdn}")
response = self.dns_zone_records()
for i in response["ResourceRecordSets"]:
if i["Name"] == fqdn:
logger.info(f"Deleting DNS Record: {i}")
d_name = i["Name"]
d_type = i["Type"]
d_ttl = i["TTL"]
d_rr = i["ResourceRecords"]
self.dns_delete_records(d_name, d_type, d_ttl, d_rr)
def dns_zone_records(self):
"""Gets all DNS records in Zone
Returns:
(dict): of DNS Records
"""
if self.dns_zone_id is None:
self.dns_zone_id = self.dns_zones(self.dns_zone_name)
response = self._r53.list_resource_record_sets(HostedZoneId=self.dns_zone_id)
if self.debug:
pprint(response)
return response
def dns_zones(self, domain):
"""Logs information and returns information on DNS Zones
Args:
domain (string): The DNS zone name, like "linecas.com"
Returns:
(string): The DNS Zone ID
"""
zones = self._r53.list_hosted_zones()
if self.debug:
pprint(zones)
for i in zones["HostedZones"]:
logger.info(f"{i['Name']} has {i['ResourceRecordSetCount']} records")
if i["Name"] == f"{domain}.":
zone_id = i["Id"]
logger.info(f"Domain Name: {domain} has Zone ID: {zone_id}")
self.dns_zone_id = zone_id
return zone_id
def ec2_get_instances(self):
"""Collects information on all ec2 instances
Returns:
list: of dictionaries on instances
"""
all_instances = []
response = self._ec2.describe_instances()
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
if self.debug:
pprint(instance)
i = {}
i["instance_tags"] = instance.get("Tags")
i["group_name"] = self.tag_get_value(i["instance_tags"], label="ansible_group", default_value="N/A")
i["instance_name"] = self.tag_get_value(i["instance_tags"], label="Name", default_value="N/A")
i["instance_id"] = instance.get("InstanceId")
i["instance_state"] = instance.get("State").get("Name")
i["instance_type"] = instance.get("InstanceType")
i["instance_private_ip"] = instance.get("PrivateIpAddress")
i["instance_public_ip"] = instance.get("PublicIpAddress", None)
try:
i["ipv6"] = instance.get("NetworkInterfaces")[0].get("Ipv6Addresses")[0].get("Ipv6Address")
except IndexError:
i["ipv6"] = None
i["image_id"] = instance.get("ImageId")
i["image_name"] = self.ami_get_friendly_name(i["image_id"])
i["instance_az"] = instance.get("Placement").get("AvailabilityZone")
all_instances.append(i)
if self.debug:
pprint(all_instances)
logger.info(f"There is {len(all_instances)} total instances")
return all_instances
def ec2_start_instance(self, instance_id):
"""Starts an EC2 instance based on AWS starter code
Args:
instance_id (string): The value of the EC2 Instane ID
"""
# Do a dryrun first to verify permissions
try:
self._ec2.start_instances(InstanceIds=[instance_id], DryRun=True)
except ClientError as e:
if "DryRunOperation" not in str(e):
raise
# Dry run succeeded, run start_instances without dryrun
try:
response = self._ec2.start_instances(InstanceIds=[instance_id], DryRun=False)
logger.info(f"RESPONSE: {response}")
except ClientError as e:
logger.error(f"ERROR: {e}")
sys.exit(1)
def eks_function(self):
"""Configures and returns information about EKS Cluster
EKS could be in States:
1) Does not exist
2) Exists but without LB (rarity)
3) Fully exists
"""
cluster = self.eks_get_cluster()
if cluster:
# setup pretty table
fields = ["Key", "Value"]
alignments = {"Key": "l", "Value": "l"}
pt = hf.pretty_table_create(fields, **alignments)
# get data
lb_tag_resp, lb_tag_dict = self.get_elb_by_tag()
lb, lb_dns = self.elb_get_lb(arn=None, name=lb_tag_dict.get("name"))
self.dns_upsert_record(source="eks.linecas.com", dest=lb_dns)
# add info to table
pt.add_row(["eks_name", cluster.get("cluster").get("name")])
pt.add_row(["eks_version", cluster.get("cluster").get("version")])
pt.add_row(["eks_endpoint", cluster.get("cluster").get("endpoint")])
pt.add_row(["ingress_lb_name", lb_tag_dict.get("name")])
pt.add_row(["ingress_lb_dns", lb_dns])
hf.pretty_table_print(pt, sort_key="Key")
else:
return
def eks_get_cluster(self, cluster="linecas-eks"):
"""Gets details on EKS Cluster
Args:
cluster (string): name of the EKS cluster
Returns:
(dict): response of cluster details otherwise returns None
"""
logger.info(f"Getting info on EKS Cluster: {cluster}")
try:
response = self._eks.describe_cluster(name=cluster)
except self._eks.exceptions.ResourceNotFoundException:
logger.warning(f"EKS Cluster: {cluster} does not exist")
response = None
if self.debug and response:
pprint(response)
return response
def elb_aggregate(self):
"""Aggregates Load Balancer and Listener configurations
Returns:
list: of dictionaries on lbs and listeners
"""
all_lbs = []
details = []
lbs = self.elb_get_loadbalancers()
for lb in lbs:
i = {}
i["lb_arn"] = lb.get("LoadBalancerArn")
i["lb_dns"] = lb.get("DNSName")
i["lb_name"] = lb.get("LoadBalancerName")
i["lb_type"] = lb.get("Type")
all_lbs.append(i)
for lb in all_lbs:
listeners = self.elb_get_listeners(lb["lb_arn"])
for listener in listeners:
j = {}
j["listener_arn"] = listener.get("ListenerArn")
j["listener_default_action"] = listener.get("DefaultActions")[0]
j["listener_default_action_type"] = listener.get("DefaultActions")[0].get("Type")
j["listener_port"] = listener.get("Port")
j["listener_protocal"] = listener.get("Protocol")
# combine the lb and listener dictionaries next
merge = {**lb, **j}
details.append(merge)
if self.debug:
logger.debug("Details on Load Balancers and Listeners")
pprint(details)
logger.info(f"There is {len(details)} listeners across {len(lbs)} load balancers")
return details
def elb_describe_rules(self, listener_arn):
"""Collects information on rules in LB Listener
Args:
listener_arn (string): Unique ARN ID of LB Listener
Returns:
list: of dictionaries on listeners
"""
response = self._elb.describe_rules(ListenerArn=listener_arn)
if self.debug:
pprint(response)
logger.info(f"There is {len(response.get('Rules'))} rules(s) in search query")
return response.get("Rules")
def elb_get_listeners(self, lb_arn):
"""Collects information on listeners in LB
Args:
lb_arn (string): Unique ARN ID of LB
Returns:
list: of dictionaries on listeners
"""
response = self._elb.describe_listeners(LoadBalancerArn=lb_arn)
if self.debug:
logger.debug("get response:")
pprint(response.get("Listeners"))
logger.info(f"There is {len(response.get('Listeners'))} listener(s) in search query")
return response.get("Listeners")
def elb_get_loadbalancers(self, lb_name=None):
"""Collects information on all load balancers
Args:
lb_name (string): optional field to get single LB
Returns:
list: of dictionaries on loadbalancers
"""
if lb_name:
response = self._elb.describe_load_balancers(Names=[lb_name])
else:
response = self._elb.describe_load_balancers()
if self.debug:
pprint(response.get("LoadBalancers"))
logger.info(f"There is {len(response.get('LoadBalancers'))} loadbalancer(s) in search query")
return response.get("LoadBalancers")
def elb_modify_listener(
self, listener_arn, action, fixed_message=None, forward_target_arn=None, redirect_port="80", redirect_url=None
):
"""Modifies the default action of a listener
Args:
listener_arn (string): ID of LB Listener
action (string): Must be one of redirect/fixed/forward
fixed_message (string): if action=fixed, the message to display
forward_target_arn (string): if action=forward, the ASG target
redirect_port (string): if action=redirect with string of destination port
redirect_url (string): if action=redirect the url to 302 redirect to
"""
if action == "redirect":
if not redirect_url:
logger.error(f"redirect_url is: {redirect_url}")
sys.exit(1)
if str(redirect_port) == "443":
protocol = "HTTPS"
else:
protocol = "HTTP"
response = self._elb.modify_listener(
ListenerArn=listener_arn,
DefaultActions=[
{
"Type": "redirect",
"RedirectConfig": {
"Protocol": protocol,
"Port": str(redirect_port),
"Host": redirect_url,
"Path": "/",
"Query": "",
"StatusCode": "HTTP_302",
},
},
],
)
elif action == "fixed":
if not fixed_message:
logger.error(f"fixed_message is: {fixed_message}")
sys.exit(1)
response = self._elb.modify_listener(
ListenerArn=listener_arn,
DefaultActions=[
{
"Type": "fixed-response",
"FixedResponseConfig": {
"MessageBody": fixed_message,
"StatusCode": "200",
"ContentType": "text/html",
},
}
],
)
elif action == "forward":
if not forward_target_arn:
logger.error(f"forward_target_arn is: {forward_target_arn}")
sys.exit(1)
response = self._elb.modify_listener(
ListenerArn=listener_arn,
DefaultActions=[
{
"Type": "forward",
"ForwardConfig": {
"TargetGroups": [
{"TargetGroupArn": forward_target_arn, "Weight": 1},
],
"TargetGroupStickinessConfig": {"Enabled": False},
},
}
],
)
else:
logger.error("This function should have never gotten here")
sys.exit(1)
if self.debug:
pprint(response)
response_code = response.get("ResponseMetadata").get("HTTPStatusCode")
if response_code != 200:
logger.error(f"response_code returned {response_code}")
sys.exit(1)
else:
logger.info(f"Updating listener returned status code: {response_code}")
def pprint_auto_scaling_groups(self):
"""Displays pretty table of Autoscaling Groups"""
fields = ["Name", "Current Size", "Min Size", "Max Size", "Target"]
alignments = {"Name": "l"}
pt = hf.pretty_table_create(fields, **alignments)
asgs = self.autoscaling_filter_groups()
for i in asgs:
pt.add_row(
[
i["asg_name"],
i["asg_size_current"],
i["asg_size_min"],
i["asg_size_max"],
i["asg_target_group"],
]
)
hf.pretty_table_print(pt, sort_key="Name")
def pprint_dns_records(self):
"""Displays tables on A and CNAME records"""
fields = ["Type", "Name", "Destination"]
alignments = {"Name": "l", "Destination": "l"}
# NOTE: `ar` is "A records", `cr` is "CNAME records", `lr` is "Alias records"
ar = hf.pretty_table_create(fields, **alignments)
cr = hf.pretty_table_create(fields, **alignments)
lr = hf.pretty_table_create(fields, **alignments)
response = self.dns_zone_records()
for i in response["ResourceRecordSets"]:
if i["Type"] == "A" and i.get("ResourceRecords"):
ar.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
if i["Type"] == "CNAME":
cr.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
# print alias type records
if i["Type"] == "A" and i.get("AliasTarget"):
lr.add_row([i["Type"], i["Name"], i["AliasTarget"]["DNSName"]])
hf.pretty_table_print(cr, sort_key="Name")
hf.pretty_table_print(ar, sort_key="Name")
hf.pretty_table_print(lr, sort_key="Name")
def pprint_ec2_instances(self):
"""Displays pretty table of EC2 Instances"""
fields = ["Group", "Name", "ID", "State", "Type", "Internal IP", "Public IP", "IPv6", "Image", "AZ"]
alignments = {"Group": "l", "Name": "l"}
pt = hf.pretty_table_create(fields, **alignments)
instances = self.ec2_get_instances()
for i in instances:
pt.add_row(
[
i["group_name"],
i["instance_name"],
i["instance_id"],
i["instance_state"],
i["instance_type"],
i["instance_private_ip"],
i["instance_public_ip"],
i["ipv6"],
i["image_name"],
i["instance_az"],
]
)
hf.pretty_table_print(pt, sort_key="Name")
def pprint_load_balancers(self):
"""Displays pretty table of Elastic Load Balancers"""
fields = ["Name", "Type", "DNS", "Protocol", "Port", "Default Action"]
alignments = {"Name": "l"}
pt = hf.pretty_table_create(fields, **alignments)
details = self.elb_aggregate()
for i in details:
pt.add_row(
[
i["lb_name"],
i["lb_type"],
i["lb_dns"],
i["listener_protocal"],
i["listener_port"],
i["listener_default_action_type"],
]
)
hf.pretty_table_print(pt, sort_key="Name")
def tag_get_value(self, tags, label, default_value=None):
"""Gets value of specific key from list of tags
Args:
tags (list): list of key-value pairs
label (string): the key of the value you want
default_value (string): if you need to return a default string if no tag present
Returns:
string: of value if successful, otherwise return default_value
"""
if tags:
for tag in tags:
if tag["Key"] == label:
return tag["Value"]
return default_value
def get_elb_by_tag(self, key="Name", value="tag-linecas-eks-ingress-nginx"):
"""Used to get a specific ELB based upon key-value pairs
Args:
key (string): which key to get
value (string): the value of the key
Returns:
dict: the full response returned by AWS
dict: a subset of specific items we desire
"""
lb = {}
response = self._tags.get_resources(
# PaginationToken=token,
TagFilters=[{"Key": key, "Values": [value]}],
ResourcesPerPage=50,
ResourceTypeFilters=[
"elasticloadbalancing:loadbalancer",
],
)
if self.debug:
pprint(response)
# TODO: analyze for loop
for i in response.get("ResourceTagMappingList"):
lb["arn"] = i["ResourceARN"]
lb["name"] = i["ResourceARN"].rsplit("/", 1)[-1]
logger.info(f"LB: {lb}")
return response, lb
def elb_get_lb(self, arn=None, name="ad5e2189ea4a34164901c67732d0657c"):
"""Get details on LB based on ARN or Name
Args:
arn (string): Unique ARN string
name (string): Name of the LB
Returns:
dict: Detailed response of LB
string: The DNS record value
"""
dns_name = None
if arn:
# NOTE: was having problems with ARN, need to evaluate further
logger.info(f"Using LB ARN: {arn}")
response = self._elb_v1.describe_load_balancers(LoadBalancerArns=[arn])
else:
logger.info(f"Using LB Name: {name}")
response = self._elb_v1.describe_load_balancers(LoadBalancerNames=[name])
if self.debug:
pprint(response)
for i in response.get("LoadBalancerDescriptions"):
dns_name = i["DNSName"]
logger.info(f"DNS Name: {dns_name}")
return response, dns_name
def get_billing_amount(self, start_date="2021-01-01", end_date="2021-02-01"):
"""Calls cost_and_usage API to get billing information, and adds to Pretty Table
Args:
start_date (string): In format "2021-01-01"
end_date (string): In format "2021-02-01"
Returns:
float: of bill amount (though this is not actively used)
"""
response = self._ce.get_cost_and_usage(
TimePeriod={"Start": start_date, "End": end_date},
Granularity="MONTHLY",
Metrics=["BlendedCost"],
GroupBy=[
{"Type": "TAG", "Key": "Project"},
],
)
if self.debug:
pprint(response)
logger.debug(f"get_cost_and_usage response: {response}")
amount = response.get("ResultsByTime")[0].get("Groups")[0].get("Metrics").get("BlendedCost").get("Amount")
bill_amount = round(Decimal(amount), 2)
logger.info(f"BILL_AMOUNT: {bill_amount}, START_DATE={start_date}, END_DATE={end_date}")
self.bt.add_row([start_date, end_date, bill_amount])
return bill_amount
def get_billing_month(self, delta):
"""Gets relative billing date
Args:
delta (int): Relative month time period to current date
Returns:
string: In format YYYY-MM-01
"""
logger.debug(f"MONTH_DELTA: {delta}")
bill_date = date.today() + relativedelta(months=delta)
return_date = bill_date.strftime("%Y-%m-01")
logger.debug(f"RETURN_DATE: {return_date}")
return return_date
def billing_function(self):
"""Gets current and last 3 bill amounts"""
month_plus_1 = self.get_billing_month(+1)
month_current = self.get_billing_month(0)
month_minus_1 = self.get_billing_month(-1)
month_minus_2 = self.get_billing_month(-2)
month_minus_3 = self.get_billing_month(-3)
# Billing Table
fields = ["Start Date", "End Date", "Bill Amount"]
self.bt = hf.pretty_table_create(fields)
self.get_billing_amount(month_current, month_plus_1)
self.get_billing_amount(month_minus_1, month_current)
self.get_billing_amount(month_minus_2, month_minus_1)
self.get_billing_amount(month_minus_3, month_minus_2)
hf.pretty_table_print(self.bt)
def get_tag_value(self, tags, label):
"""Gets value of specific key from list of tags
Args:
tags (list): list of key-value pairs
label (string): the key of the value you want
Returns:
string: of value if successful, None otherwise
"""
for tag in tags:
if tag["Key"] == label:
return tag["Value"]
# Default return
return None
__init__(self, domain='linecas.com', debug=False)
special
Sets up the clients and DNS information for interaction with AWS
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain |
string |
DNS zone to use. If None, default is linecas.com |
'linecas.com' |
debug |
bool |
Used for pprint and log debugging |
False |
Source code in linecaspy/aws/aws.py
def __init__(self, domain="linecas.com", debug=False):
"""Sets up the clients and DNS information for interaction with AWS
Args:
domain (string): DNS zone to use. If None, default is linecas.com
debug (bool): Used for pprint and log debugging
"""
self.debug = debug
self.ami_image_cache = []
logger.info(f"Intializing the AWS Class using Domain: {domain} with debug: {debug}")
# Clients
self._as = boto3.client("autoscaling")
self._ce = boto3.client("ce")
self._ec2 = boto3.client("ec2")
self._eks = boto3.client("eks")
self._elb_v1 = boto3.client("elb") # NOTE: not elbv2
self._elb = boto3.client("elbv2") # NOTE: not elb
self._r53 = boto3.client("route53")
self._tags = boto3.client("resourcegroupstaggingapi")
# DNS
self.dns_zone_name = domain
self.dns_zone_id = None
self.dns_ttl = 90
ami_get_friendly_name(self, ami_id, words=3)
Gets a shortened name of Image used
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ami_id |
string |
ID of the AMI |
required |
words |
int |
of the first X amount of strings in name |
3 |
Returns:
| Type | Description |
|---|---|
string |
a human comprehensible name |
Source code in linecaspy/aws/aws.py
def ami_get_friendly_name(self, ami_id, words=3):
"""Gets a shortened name of Image used
Args:
ami_id (string): ID of the AMI
words (int): of the first X amount of strings in name
Returns:
string: a human comprehensible name
"""
# first check if we already looked up this AMI ID
for ami in self.ami_image_cache:
for key, value in ami.items():
if key == ami_id:
friendly_name = value
logger.debug(f"Found {key} with {value}")
return friendly_name
# if not found, then we'll make an API call to discover
response = self._ec2.describe_images(ImageIds=[ami_id])
logger.debug(f"RESPONSE: {response}")
friendly_name = None
images = response.get("Images")
for image in images:
image_name = image.get("Name")
logger.debug(f"IMAGE_NAME: {image_name}")
friendly_name = " ".join(image_name.split()[: int(words)])
# and add that discovery to the image cache
image_dict = {ami_id: friendly_name}
self.ami_image_cache.append(image_dict)
return friendly_name
autoscaling_filter_groups(self)
Filters on specific subset of information in autoscaling groups
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on autoscaling groups |
Source code in linecaspy/aws/aws.py
def autoscaling_filter_groups(self):
"""Filters on specific subset of information in autoscaling groups
Returns:
list: of dictionaries on autoscaling groups
"""
all_asgs = []
asgs = self.autoscaling_get_groups()
for asg in asgs:
i = {}
i["asg_arn"] = asg.get("AutoScalingGroupARN")
i["asg_name"] = asg.get("AutoScalingGroupName")
i["asg_size_current"] = len(asg.get("Instances"))
i["asg_size_max"] = asg.get("MaxSize")
i["asg_size_min"] = asg.get("MinSize")
try:
# NOTE: assumes ASG will have single target
i["asg_target_group"] = asg.get("TargetGroupARNs")[0]
except IndexError:
i["asg_target_group"] = None
all_asgs.append(i)
if self.debug:
logger.debug("filtered response:")
pprint(all_asgs)
return all_asgs
autoscaling_get_groups(self, asg_name=None)
Collects information on all autoscaling groups
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asg_name |
string |
name of specific Autoscaling Group |
None |
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on autoscaling groups |
Source code in linecaspy/aws/aws.py
def autoscaling_get_groups(self, asg_name=None):
"""Collects information on all autoscaling groups
Args:
asg_name (string): name of specific Autoscaling Group
Returns:
list: of dictionaries on autoscaling groups
"""
if asg_name:
response = self._as.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name])
else:
response = self._as.describe_auto_scaling_groups()
if self.debug:
logger.debug("get response:")
pprint(response.get("AutoScalingGroups"))
logger.info(f"There is {len(response.get('AutoScalingGroups'))} Autoscaling-groups(s) in search query")
return response.get("AutoScalingGroups")
billing_function(self)
Gets current and last 3 bill amounts
Source code in linecaspy/aws/aws.py
def billing_function(self):
"""Gets current and last 3 bill amounts"""
month_plus_1 = self.get_billing_month(+1)
month_current = self.get_billing_month(0)
month_minus_1 = self.get_billing_month(-1)
month_minus_2 = self.get_billing_month(-2)
month_minus_3 = self.get_billing_month(-3)
# Billing Table
fields = ["Start Date", "End Date", "Bill Amount"]
self.bt = hf.pretty_table_create(fields)
self.get_billing_amount(month_current, month_plus_1)
self.get_billing_amount(month_minus_1, month_current)
self.get_billing_amount(month_minus_2, month_minus_1)
self.get_billing_amount(month_minus_3, month_minus_2)
hf.pretty_table_print(self.bt)
dns_delete_record_function(self, name)
Handler that calls the "dns_zone_records" and "dns_delete_records" functions
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
string |
name of DNS record, will be expanded to FQDN |
required |
Source code in linecaspy/aws/aws.py
def dns_delete_record_function(self, name):
"""Handler that calls the "dns_zone_records" and "dns_delete_records" functions
Args:
name (string): name of DNS record, will be expanded to FQDN
"""
fqdn = f"{name}.{self.dns_zone_name}."
logger.info(f"Looking for record with name: {fqdn}")
response = self.dns_zone_records()
for i in response["ResourceRecordSets"]:
if i["Name"] == fqdn:
logger.info(f"Deleting DNS Record: {i}")
d_name = i["Name"]
d_type = i["Type"]
d_ttl = i["TTL"]
d_rr = i["ResourceRecords"]
self.dns_delete_records(d_name, d_type, d_ttl, d_rr)
dns_delete_records(self, name, type_r, ttl, resources)
Performs the deletion of DNS record
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
string |
Name of record |
required |
type_r |
string |
Type of record like A or CNAME |
required |
ttl |
string |
Time To Live value |
required |
resources |
list |
Destination ResourceRecords |
required |
Source code in linecaspy/aws/aws.py
def dns_delete_records(self, name, type_r, ttl, resources):
"""Performs the deletion of DNS record
Args:
name (string): Name of record
type_r (string): Type of record like A or CNAME
ttl (string): Time To Live value
resources (list): Destination ResourceRecords
"""
logger.info(f"Received deletion request for {name}")
response = self._r53.change_resource_record_sets(
ChangeBatch={
"Changes": [
{
"Action": "DELETE",
"ResourceRecordSet": {
"Name": name,
"ResourceRecords": resources,
"Type": type_r,
"TTL": ttl,
},
},
]
},
HostedZoneId=self.dns_zone_id,
)
if self.debug:
pprint(response)
logger.info(f"RESPONSE: {response}")
dns_upsert_record(self, source, dest, r_type='CNAME')
Creates or updates DNS record
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source |
string |
DNS value you want to map |
required |
dest |
string |
DNS value you want to map to |
required |
r_type |
string |
DNS record type, typically 'A' or 'CNAME' |
'CNAME' |
Returns:
| Type | Description |
|---|---|
dict |
response of DNS event |
Source code in linecaspy/aws/aws.py
def dns_upsert_record(self, source, dest, r_type="CNAME"):
"""Creates or updates DNS record
Args:
source (string): DNS value you want to map
dest (string): DNS value you want to map to
r_type (string): DNS record type, typically 'A' or 'CNAME'
Returns:
dict: response of DNS event
"""
if self.dns_zone_id is None:
self.dns_zone_id = self.dns_zones(self.dns_zone_name)
response = self._r53.change_resource_record_sets(
HostedZoneId=self.dns_zone_id,
ChangeBatch={
"Comment": f"add {source} -> {dest}",
"Changes": [
{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": source,
"Type": r_type,
"TTL": self.dns_ttl,
"ResourceRecords": [{"Value": dest}],
},
}
],
},
)
if self.debug:
pprint(response)
logger.info(f"RESPONSE: {response}")
return response
dns_zone_records(self)
Gets all DNS records in Zone
Returns:
| Type | Description |
|---|---|
(dict) |
of DNS Records |
Source code in linecaspy/aws/aws.py
def dns_zone_records(self):
"""Gets all DNS records in Zone
Returns:
(dict): of DNS Records
"""
if self.dns_zone_id is None:
self.dns_zone_id = self.dns_zones(self.dns_zone_name)
response = self._r53.list_resource_record_sets(HostedZoneId=self.dns_zone_id)
if self.debug:
pprint(response)
return response
dns_zones(self, domain)
Logs information and returns information on DNS Zones
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain |
string |
The DNS zone name, like "linecas.com" |
required |
Returns:
| Type | Description |
|---|---|
(string) |
The DNS Zone ID |
Source code in linecaspy/aws/aws.py
def dns_zones(self, domain):
"""Logs information and returns information on DNS Zones
Args:
domain (string): The DNS zone name, like "linecas.com"
Returns:
(string): The DNS Zone ID
"""
zones = self._r53.list_hosted_zones()
if self.debug:
pprint(zones)
for i in zones["HostedZones"]:
logger.info(f"{i['Name']} has {i['ResourceRecordSetCount']} records")
if i["Name"] == f"{domain}.":
zone_id = i["Id"]
logger.info(f"Domain Name: {domain} has Zone ID: {zone_id}")
self.dns_zone_id = zone_id
return zone_id
ec2_get_instances(self)
Collects information on all ec2 instances
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on instances |
Source code in linecaspy/aws/aws.py
def ec2_get_instances(self):
"""Collects information on all ec2 instances
Returns:
list: of dictionaries on instances
"""
all_instances = []
response = self._ec2.describe_instances()
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
if self.debug:
pprint(instance)
i = {}
i["instance_tags"] = instance.get("Tags")
i["group_name"] = self.tag_get_value(i["instance_tags"], label="ansible_group", default_value="N/A")
i["instance_name"] = self.tag_get_value(i["instance_tags"], label="Name", default_value="N/A")
i["instance_id"] = instance.get("InstanceId")
i["instance_state"] = instance.get("State").get("Name")
i["instance_type"] = instance.get("InstanceType")
i["instance_private_ip"] = instance.get("PrivateIpAddress")
i["instance_public_ip"] = instance.get("PublicIpAddress", None)
try:
i["ipv6"] = instance.get("NetworkInterfaces")[0].get("Ipv6Addresses")[0].get("Ipv6Address")
except IndexError:
i["ipv6"] = None
i["image_id"] = instance.get("ImageId")
i["image_name"] = self.ami_get_friendly_name(i["image_id"])
i["instance_az"] = instance.get("Placement").get("AvailabilityZone")
all_instances.append(i)
if self.debug:
pprint(all_instances)
logger.info(f"There is {len(all_instances)} total instances")
return all_instances
ec2_start_instance(self, instance_id)
Starts an EC2 instance based on AWS starter code
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id |
string |
The value of the EC2 Instane ID |
required |
Source code in linecaspy/aws/aws.py
def ec2_start_instance(self, instance_id):
"""Starts an EC2 instance based on AWS starter code
Args:
instance_id (string): The value of the EC2 Instane ID
"""
# Do a dryrun first to verify permissions
try:
self._ec2.start_instances(InstanceIds=[instance_id], DryRun=True)
except ClientError as e:
if "DryRunOperation" not in str(e):
raise
# Dry run succeeded, run start_instances without dryrun
try:
response = self._ec2.start_instances(InstanceIds=[instance_id], DryRun=False)
logger.info(f"RESPONSE: {response}")
except ClientError as e:
logger.error(f"ERROR: {e}")
sys.exit(1)
eks_function(self)
Configures and returns information about EKS Cluster
EKS could be in States: 1) Does not exist 2) Exists but without LB (rarity) 3) Fully exists
Source code in linecaspy/aws/aws.py
def eks_function(self):
"""Configures and returns information about EKS Cluster
EKS could be in States:
1) Does not exist
2) Exists but without LB (rarity)
3) Fully exists
"""
cluster = self.eks_get_cluster()
if cluster:
# setup pretty table
fields = ["Key", "Value"]
alignments = {"Key": "l", "Value": "l"}
pt = hf.pretty_table_create(fields, **alignments)
# get data
lb_tag_resp, lb_tag_dict = self.get_elb_by_tag()
lb, lb_dns = self.elb_get_lb(arn=None, name=lb_tag_dict.get("name"))
self.dns_upsert_record(source="eks.linecas.com", dest=lb_dns)
# add info to table
pt.add_row(["eks_name", cluster.get("cluster").get("name")])
pt.add_row(["eks_version", cluster.get("cluster").get("version")])
pt.add_row(["eks_endpoint", cluster.get("cluster").get("endpoint")])
pt.add_row(["ingress_lb_name", lb_tag_dict.get("name")])
pt.add_row(["ingress_lb_dns", lb_dns])
hf.pretty_table_print(pt, sort_key="Key")
else:
return
eks_get_cluster(self, cluster='linecas-eks')
Gets details on EKS Cluster
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cluster |
string |
name of the EKS cluster |
'linecas-eks' |
Returns:
| Type | Description |
|---|---|
(dict) |
response of cluster details otherwise returns None |
Source code in linecaspy/aws/aws.py
def eks_get_cluster(self, cluster="linecas-eks"):
"""Gets details on EKS Cluster
Args:
cluster (string): name of the EKS cluster
Returns:
(dict): response of cluster details otherwise returns None
"""
logger.info(f"Getting info on EKS Cluster: {cluster}")
try:
response = self._eks.describe_cluster(name=cluster)
except self._eks.exceptions.ResourceNotFoundException:
logger.warning(f"EKS Cluster: {cluster} does not exist")
response = None
if self.debug and response:
pprint(response)
return response
elb_aggregate(self)
Aggregates Load Balancer and Listener configurations
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on lbs and listeners |
Source code in linecaspy/aws/aws.py
def elb_aggregate(self):
"""Aggregates Load Balancer and Listener configurations
Returns:
list: of dictionaries on lbs and listeners
"""
all_lbs = []
details = []
lbs = self.elb_get_loadbalancers()
for lb in lbs:
i = {}
i["lb_arn"] = lb.get("LoadBalancerArn")
i["lb_dns"] = lb.get("DNSName")
i["lb_name"] = lb.get("LoadBalancerName")
i["lb_type"] = lb.get("Type")
all_lbs.append(i)
for lb in all_lbs:
listeners = self.elb_get_listeners(lb["lb_arn"])
for listener in listeners:
j = {}
j["listener_arn"] = listener.get("ListenerArn")
j["listener_default_action"] = listener.get("DefaultActions")[0]
j["listener_default_action_type"] = listener.get("DefaultActions")[0].get("Type")
j["listener_port"] = listener.get("Port")
j["listener_protocal"] = listener.get("Protocol")
# combine the lb and listener dictionaries next
merge = {**lb, **j}
details.append(merge)
if self.debug:
logger.debug("Details on Load Balancers and Listeners")
pprint(details)
logger.info(f"There is {len(details)} listeners across {len(lbs)} load balancers")
return details
elb_describe_rules(self, listener_arn)
Collects information on rules in LB Listener
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
listener_arn |
string |
Unique ARN ID of LB Listener |
required |
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on listeners |
Source code in linecaspy/aws/aws.py
def elb_describe_rules(self, listener_arn):
"""Collects information on rules in LB Listener
Args:
listener_arn (string): Unique ARN ID of LB Listener
Returns:
list: of dictionaries on listeners
"""
response = self._elb.describe_rules(ListenerArn=listener_arn)
if self.debug:
pprint(response)
logger.info(f"There is {len(response.get('Rules'))} rules(s) in search query")
return response.get("Rules")
elb_get_lb(self, arn=None, name='ad5e2189ea4a34164901c67732d0657c')
Get details on LB based on ARN or Name
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arn |
string |
Unique ARN string |
None |
name |
string |
Name of the LB |
'ad5e2189ea4a34164901c67732d0657c' |
Returns:
| Type | Description |
|---|---|
dict |
Detailed response of LB string: The DNS record value |
Source code in linecaspy/aws/aws.py
def elb_get_lb(self, arn=None, name="ad5e2189ea4a34164901c67732d0657c"):
"""Get details on LB based on ARN or Name
Args:
arn (string): Unique ARN string
name (string): Name of the LB
Returns:
dict: Detailed response of LB
string: The DNS record value
"""
dns_name = None
if arn:
# NOTE: was having problems with ARN, need to evaluate further
logger.info(f"Using LB ARN: {arn}")
response = self._elb_v1.describe_load_balancers(LoadBalancerArns=[arn])
else:
logger.info(f"Using LB Name: {name}")
response = self._elb_v1.describe_load_balancers(LoadBalancerNames=[name])
if self.debug:
pprint(response)
for i in response.get("LoadBalancerDescriptions"):
dns_name = i["DNSName"]
logger.info(f"DNS Name: {dns_name}")
return response, dns_name
elb_get_listeners(self, lb_arn)
Collects information on listeners in LB
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lb_arn |
string |
Unique ARN ID of LB |
required |
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on listeners |
Source code in linecaspy/aws/aws.py
def elb_get_listeners(self, lb_arn):
"""Collects information on listeners in LB
Args:
lb_arn (string): Unique ARN ID of LB
Returns:
list: of dictionaries on listeners
"""
response = self._elb.describe_listeners(LoadBalancerArn=lb_arn)
if self.debug:
logger.debug("get response:")
pprint(response.get("Listeners"))
logger.info(f"There is {len(response.get('Listeners'))} listener(s) in search query")
return response.get("Listeners")
elb_get_loadbalancers(self, lb_name=None)
Collects information on all load balancers
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lb_name |
string |
optional field to get single LB |
None |
Returns:
| Type | Description |
|---|---|
list |
of dictionaries on loadbalancers |
Source code in linecaspy/aws/aws.py
def elb_get_loadbalancers(self, lb_name=None):
"""Collects information on all load balancers
Args:
lb_name (string): optional field to get single LB
Returns:
list: of dictionaries on loadbalancers
"""
if lb_name:
response = self._elb.describe_load_balancers(Names=[lb_name])
else:
response = self._elb.describe_load_balancers()
if self.debug:
pprint(response.get("LoadBalancers"))
logger.info(f"There is {len(response.get('LoadBalancers'))} loadbalancer(s) in search query")
return response.get("LoadBalancers")
elb_modify_listener(self, listener_arn, action, fixed_message=None, forward_target_arn=None, redirect_port='80', redirect_url=None)
Modifies the default action of a listener
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
listener_arn |
string |
ID of LB Listener |
required |
action |
string |
Must be one of redirect/fixed/forward |
required |
fixed_message |
string |
if action=fixed, the message to display |
None |
forward_target_arn |
string |
if action=forward, the ASG target |
None |
redirect_port |
string |
if action=redirect with string of destination port |
'80' |
redirect_url |
string |
if action=redirect the url to 302 redirect to |
None |
Source code in linecaspy/aws/aws.py
def elb_modify_listener(
self, listener_arn, action, fixed_message=None, forward_target_arn=None, redirect_port="80", redirect_url=None
):
"""Modifies the default action of a listener
Args:
listener_arn (string): ID of LB Listener
action (string): Must be one of redirect/fixed/forward
fixed_message (string): if action=fixed, the message to display
forward_target_arn (string): if action=forward, the ASG target
redirect_port (string): if action=redirect with string of destination port
redirect_url (string): if action=redirect the url to 302 redirect to
"""
if action == "redirect":
if not redirect_url:
logger.error(f"redirect_url is: {redirect_url}")
sys.exit(1)
if str(redirect_port) == "443":
protocol = "HTTPS"
else:
protocol = "HTTP"
response = self._elb.modify_listener(
ListenerArn=listener_arn,
DefaultActions=[
{
"Type": "redirect",
"RedirectConfig": {
"Protocol": protocol,
"Port": str(redirect_port),
"Host": redirect_url,
"Path": "/",
"Query": "",
"StatusCode": "HTTP_302",
},
},
],
)
elif action == "fixed":
if not fixed_message:
logger.error(f"fixed_message is: {fixed_message}")
sys.exit(1)
response = self._elb.modify_listener(
ListenerArn=listener_arn,
DefaultActions=[
{
"Type": "fixed-response",
"FixedResponseConfig": {
"MessageBody": fixed_message,
"StatusCode": "200",
"ContentType": "text/html",
},
}
],
)
elif action == "forward":
if not forward_target_arn:
logger.error(f"forward_target_arn is: {forward_target_arn}")
sys.exit(1)
response = self._elb.modify_listener(
ListenerArn=listener_arn,
DefaultActions=[
{
"Type": "forward",
"ForwardConfig": {
"TargetGroups": [
{"TargetGroupArn": forward_target_arn, "Weight": 1},
],
"TargetGroupStickinessConfig": {"Enabled": False},
},
}
],
)
else:
logger.error("This function should have never gotten here")
sys.exit(1)
if self.debug:
pprint(response)
response_code = response.get("ResponseMetadata").get("HTTPStatusCode")
if response_code != 200:
logger.error(f"response_code returned {response_code}")
sys.exit(1)
else:
logger.info(f"Updating listener returned status code: {response_code}")
get_billing_amount(self, start_date='2021-01-01', end_date='2021-02-01')
Calls cost_and_usage API to get billing information, and adds to Pretty Table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_date |
string |
In format "2021-01-01" |
'2021-01-01' |
end_date |
string |
In format "2021-02-01" |
'2021-02-01' |
Returns:
| Type | Description |
|---|---|
float |
of bill amount (though this is not actively used) |
Source code in linecaspy/aws/aws.py
def get_billing_amount(self, start_date="2021-01-01", end_date="2021-02-01"):
"""Calls cost_and_usage API to get billing information, and adds to Pretty Table
Args:
start_date (string): In format "2021-01-01"
end_date (string): In format "2021-02-01"
Returns:
float: of bill amount (though this is not actively used)
"""
response = self._ce.get_cost_and_usage(
TimePeriod={"Start": start_date, "End": end_date},
Granularity="MONTHLY",
Metrics=["BlendedCost"],
GroupBy=[
{"Type": "TAG", "Key": "Project"},
],
)
if self.debug:
pprint(response)
logger.debug(f"get_cost_and_usage response: {response}")
amount = response.get("ResultsByTime")[0].get("Groups")[0].get("Metrics").get("BlendedCost").get("Amount")
bill_amount = round(Decimal(amount), 2)
logger.info(f"BILL_AMOUNT: {bill_amount}, START_DATE={start_date}, END_DATE={end_date}")
self.bt.add_row([start_date, end_date, bill_amount])
return bill_amount
get_billing_month(self, delta)
Gets relative billing date
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delta |
int |
Relative month time period to current date |
required |
Returns:
| Type | Description |
|---|---|
string |
In format YYYY-MM-01 |
Source code in linecaspy/aws/aws.py
def get_billing_month(self, delta):
"""Gets relative billing date
Args:
delta (int): Relative month time period to current date
Returns:
string: In format YYYY-MM-01
"""
logger.debug(f"MONTH_DELTA: {delta}")
bill_date = date.today() + relativedelta(months=delta)
return_date = bill_date.strftime("%Y-%m-01")
logger.debug(f"RETURN_DATE: {return_date}")
return return_date
get_elb_by_tag(self, key='Name', value='tag-linecas-eks-ingress-nginx')
Used to get a specific ELB based upon key-value pairs
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key |
string |
which key to get |
'Name' |
value |
string |
the value of the key |
'tag-linecas-eks-ingress-nginx' |
Returns:
| Type | Description |
|---|---|
dict |
the full response returned by AWS dict: a subset of specific items we desire |
Source code in linecaspy/aws/aws.py
def get_elb_by_tag(self, key="Name", value="tag-linecas-eks-ingress-nginx"):
"""Used to get a specific ELB based upon key-value pairs
Args:
key (string): which key to get
value (string): the value of the key
Returns:
dict: the full response returned by AWS
dict: a subset of specific items we desire
"""
lb = {}
response = self._tags.get_resources(
# PaginationToken=token,
TagFilters=[{"Key": key, "Values": [value]}],
ResourcesPerPage=50,
ResourceTypeFilters=[
"elasticloadbalancing:loadbalancer",
],
)
if self.debug:
pprint(response)
# TODO: analyze for loop
for i in response.get("ResourceTagMappingList"):
lb["arn"] = i["ResourceARN"]
lb["name"] = i["ResourceARN"].rsplit("/", 1)[-1]
logger.info(f"LB: {lb}")
return response, lb
get_tag_value(self, tags, label)
Gets value of specific key from list of tags
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags |
list |
list of key-value pairs |
required |
label |
string |
the key of the value you want |
required |
Returns:
| Type | Description |
|---|---|
string |
of value if successful, None otherwise |
Source code in linecaspy/aws/aws.py
def get_tag_value(self, tags, label):
"""Gets value of specific key from list of tags
Args:
tags (list): list of key-value pairs
label (string): the key of the value you want
Returns:
string: of value if successful, None otherwise
"""
for tag in tags:
if tag["Key"] == label:
return tag["Value"]
# Default return
return None
pprint_auto_scaling_groups(self)
Displays pretty table of Autoscaling Groups
Source code in linecaspy/aws/aws.py
def pprint_auto_scaling_groups(self):
"""Displays pretty table of Autoscaling Groups"""
fields = ["Name", "Current Size", "Min Size", "Max Size", "Target"]
alignments = {"Name": "l"}
pt = hf.pretty_table_create(fields, **alignments)
asgs = self.autoscaling_filter_groups()
for i in asgs:
pt.add_row(
[
i["asg_name"],
i["asg_size_current"],
i["asg_size_min"],
i["asg_size_max"],
i["asg_target_group"],
]
)
hf.pretty_table_print(pt, sort_key="Name")
pprint_dns_records(self)
Displays tables on A and CNAME records
Source code in linecaspy/aws/aws.py
def pprint_dns_records(self):
"""Displays tables on A and CNAME records"""
fields = ["Type", "Name", "Destination"]
alignments = {"Name": "l", "Destination": "l"}
# NOTE: `ar` is "A records", `cr` is "CNAME records", `lr` is "Alias records"
ar = hf.pretty_table_create(fields, **alignments)
cr = hf.pretty_table_create(fields, **alignments)
lr = hf.pretty_table_create(fields, **alignments)
response = self.dns_zone_records()
for i in response["ResourceRecordSets"]:
if i["Type"] == "A" and i.get("ResourceRecords"):
ar.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
if i["Type"] == "CNAME":
cr.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
# print alias type records
if i["Type"] == "A" and i.get("AliasTarget"):
lr.add_row([i["Type"], i["Name"], i["AliasTarget"]["DNSName"]])
hf.pretty_table_print(cr, sort_key="Name")
hf.pretty_table_print(ar, sort_key="Name")
hf.pretty_table_print(lr, sort_key="Name")
pprint_ec2_instances(self)
Displays pretty table of EC2 Instances
Source code in linecaspy/aws/aws.py
def pprint_ec2_instances(self):
"""Displays pretty table of EC2 Instances"""
fields = ["Group", "Name", "ID", "State", "Type", "Internal IP", "Public IP", "IPv6", "Image", "AZ"]
alignments = {"Group": "l", "Name": "l"}
pt = hf.pretty_table_create(fields, **alignments)
instances = self.ec2_get_instances()
for i in instances:
pt.add_row(
[
i["group_name"],
i["instance_name"],
i["instance_id"],
i["instance_state"],
i["instance_type"],
i["instance_private_ip"],
i["instance_public_ip"],
i["ipv6"],
i["image_name"],
i["instance_az"],
]
)
hf.pretty_table_print(pt, sort_key="Name")
pprint_load_balancers(self)
Displays pretty table of Elastic Load Balancers
Source code in linecaspy/aws/aws.py
def pprint_load_balancers(self):
"""Displays pretty table of Elastic Load Balancers"""
fields = ["Name", "Type", "DNS", "Protocol", "Port", "Default Action"]
alignments = {"Name": "l"}
pt = hf.pretty_table_create(fields, **alignments)
details = self.elb_aggregate()
for i in details:
pt.add_row(
[
i["lb_name"],
i["lb_type"],
i["lb_dns"],
i["listener_protocal"],
i["listener_port"],
i["listener_default_action_type"],
]
)
hf.pretty_table_print(pt, sort_key="Name")
tag_get_value(self, tags, label, default_value=None)
Gets value of specific key from list of tags
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags |
list |
list of key-value pairs |
required |
label |
string |
the key of the value you want |
required |
default_value |
string |
if you need to return a default string if no tag present |
None |
Returns:
| Type | Description |
|---|---|
string |
of value if successful, otherwise return default_value |
Source code in linecaspy/aws/aws.py
def tag_get_value(self, tags, label, default_value=None):
"""Gets value of specific key from list of tags
Args:
tags (list): list of key-value pairs
label (string): the key of the value you want
default_value (string): if you need to return a default string if no tag present
Returns:
string: of value if successful, otherwise return default_value
"""
if tags:
for tag in tags:
if tag["Key"] == label:
return tag["Value"]
return default_value