Skip to content

AWS

Sample Commands

on-aws --azs
on-aws --billing
on-aws --delete-record xyz
on-aws --dns
on-aws --eks
on-aws --instances
Source code in linecaspy/aws/aws.py
class AwsClass:
    def __init__(self, domain="linecas.com", debug=False):
        """Sets up the clients and DNS information for interaction with AWS

        Args:
            domain (string): DNS zone to use.  If None, default is linecas.com
            debug (bool): Used for pprint and log debugging
        """
        self.debug = debug
        self.ami_image_cache = []
        logger.info(f"Intializing the AWS Class using Domain: {domain} with debug: {debug}")
        # Clients
        self._as = boto3.client("autoscaling")
        self._ce = boto3.client("ce")
        self._ec2 = boto3.client("ec2")
        self._eks = boto3.client("eks")
        self._elb_v1 = boto3.client("elb")  # NOTE: not elbv2
        self._elb = boto3.client("elbv2")  # NOTE: not elb
        self._r53 = boto3.client("route53")
        self._tags = boto3.client("resourcegroupstaggingapi")
        # DNS
        self.dns_zone_name = domain
        self.dns_zone_id = None
        self.dns_ttl = 90

    def ami_get_friendly_name(self, ami_id, words=3):
        """Gets a shortened name of Image used

        Args:
            ami_id (string): ID of the AMI
            words (int): of the first X amount of strings in name

        Returns:
            string: a human comprehensible name
        """
        # first check if we already looked up this AMI ID
        for ami in self.ami_image_cache:
            for key, value in ami.items():
                if key == ami_id:
                    friendly_name = value
                    logger.debug(f"Found {key} with {value}")
                    return friendly_name
        # if not found, then we'll make an API call to discover
        response = self._ec2.describe_images(ImageIds=[ami_id])
        logger.debug(f"RESPONSE: {response}")
        friendly_name = None
        images = response.get("Images")
        for image in images:
            image_name = image.get("Name")
            logger.debug(f"IMAGE_NAME: {image_name}")
            friendly_name = " ".join(image_name.split()[: int(words)])
            # and add that discovery to the image cache
            image_dict = {ami_id: friendly_name}
            self.ami_image_cache.append(image_dict)
        return friendly_name

    def autoscaling_get_groups(self, asg_name=None):
        """Collects information on all autoscaling groups

        Args:
            asg_name (string): name of specific Autoscaling Group

        Returns:
            list: of dictionaries on autoscaling groups
        """
        if asg_name:
            response = self._as.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name])
        else:
            response = self._as.describe_auto_scaling_groups()
        if self.debug:
            logger.debug("get response:")
            pprint(response.get("AutoScalingGroups"))
        logger.info(f"There is {len(response.get('AutoScalingGroups'))} Autoscaling-groups(s) in search query")
        return response.get("AutoScalingGroups")

    def autoscaling_filter_groups(self):
        """Filters on specific subset of information in autoscaling groups

        Returns:
            list: of dictionaries on autoscaling groups
        """
        all_asgs = []
        asgs = self.autoscaling_get_groups()
        for asg in asgs:
            i = {}
            i["asg_arn"] = asg.get("AutoScalingGroupARN")
            i["asg_name"] = asg.get("AutoScalingGroupName")
            i["asg_size_current"] = len(asg.get("Instances"))
            i["asg_size_max"] = asg.get("MaxSize")
            i["asg_size_min"] = asg.get("MinSize")
            try:
                # NOTE: assumes ASG will have single target
                i["asg_target_group"] = asg.get("TargetGroupARNs")[0]
            except IndexError:
                i["asg_target_group"] = None
            all_asgs.append(i)
        if self.debug:
            logger.debug("filtered response:")
            pprint(all_asgs)
        return all_asgs

    def dns_delete_records(self, name, type_r, ttl, resources):
        """Performs the deletion of DNS record

        Args:
            name (string): Name of record
            type_r (string): Type of record like A or CNAME
            ttl (string): Time To Live value
            resources (list): Destination ResourceRecords
        """
        logger.info(f"Received deletion request for {name}")
        response = self._r53.change_resource_record_sets(
            ChangeBatch={
                "Changes": [
                    {
                        "Action": "DELETE",
                        "ResourceRecordSet": {
                            "Name": name,
                            "ResourceRecords": resources,
                            "Type": type_r,
                            "TTL": ttl,
                        },
                    },
                ]
            },
            HostedZoneId=self.dns_zone_id,
        )
        if self.debug:
            pprint(response)
        logger.info(f"RESPONSE: {response}")

    def dns_upsert_record(self, source, dest, r_type="CNAME"):
        """Creates or updates DNS record

        Args:
            source (string): DNS value you want to map
            dest (string): DNS value you want to map to
            r_type (string): DNS record type, typically 'A' or 'CNAME'

        Returns:
            dict: response of DNS event
        """
        if self.dns_zone_id is None:
            self.dns_zone_id = self.dns_zones(self.dns_zone_name)
        response = self._r53.change_resource_record_sets(
            HostedZoneId=self.dns_zone_id,
            ChangeBatch={
                "Comment": f"add {source} -> {dest}",
                "Changes": [
                    {
                        "Action": "UPSERT",
                        "ResourceRecordSet": {
                            "Name": source,
                            "Type": r_type,
                            "TTL": self.dns_ttl,
                            "ResourceRecords": [{"Value": dest}],
                        },
                    }
                ],
            },
        )
        if self.debug:
            pprint(response)
        logger.info(f"RESPONSE: {response}")
        return response

    def dns_delete_record_function(self, name):
        """Handler that calls the "dns_zone_records" and "dns_delete_records" functions

        Args:
            name (string): name of DNS record, will be expanded to FQDN
        """
        fqdn = f"{name}.{self.dns_zone_name}."
        logger.info(f"Looking for record with name: {fqdn}")
        response = self.dns_zone_records()
        for i in response["ResourceRecordSets"]:
            if i["Name"] == fqdn:
                logger.info(f"Deleting DNS Record: {i}")
                d_name = i["Name"]
                d_type = i["Type"]
                d_ttl = i["TTL"]
                d_rr = i["ResourceRecords"]
                self.dns_delete_records(d_name, d_type, d_ttl, d_rr)

    def dns_zone_records(self):
        """Gets all DNS records in Zone

        Returns:
            (dict): of DNS Records
        """
        if self.dns_zone_id is None:
            self.dns_zone_id = self.dns_zones(self.dns_zone_name)
        response = self._r53.list_resource_record_sets(HostedZoneId=self.dns_zone_id)
        if self.debug:
            pprint(response)
        return response

    def dns_zones(self, domain):
        """Logs information and returns information on DNS Zones

        Args:
            domain (string): The DNS zone name, like "linecas.com"

        Returns:
            (string): The DNS Zone ID
        """
        zones = self._r53.list_hosted_zones()
        if self.debug:
            pprint(zones)
        for i in zones["HostedZones"]:
            logger.info(f"{i['Name']} has {i['ResourceRecordSetCount']} records")
            if i["Name"] == f"{domain}.":
                zone_id = i["Id"]
        logger.info(f"Domain Name: {domain} has Zone ID: {zone_id}")
        self.dns_zone_id = zone_id
        return zone_id

    def ec2_get_instances(self):
        """Collects information on all ec2 instances

        Returns:
            list: of dictionaries on instances
        """
        all_instances = []
        response = self._ec2.describe_instances()
        for reservation in response["Reservations"]:
            for instance in reservation["Instances"]:
                if self.debug:
                    pprint(instance)
                i = {}
                i["instance_tags"] = instance.get("Tags")
                i["group_name"] = self.tag_get_value(i["instance_tags"], label="ansible_group", default_value="N/A")
                i["instance_name"] = self.tag_get_value(i["instance_tags"], label="Name", default_value="N/A")
                i["instance_id"] = instance.get("InstanceId")
                i["instance_state"] = instance.get("State").get("Name")
                i["instance_type"] = instance.get("InstanceType")
                i["instance_private_ip"] = instance.get("PrivateIpAddress")
                i["instance_public_ip"] = instance.get("PublicIpAddress", None)
                try:
                    i["ipv6"] = instance.get("NetworkInterfaces")[0].get("Ipv6Addresses")[0].get("Ipv6Address")
                except IndexError:
                    i["ipv6"] = None
                i["image_id"] = instance.get("ImageId")
                i["image_name"] = self.ami_get_friendly_name(i["image_id"])
                i["instance_az"] = instance.get("Placement").get("AvailabilityZone")
                all_instances.append(i)
        if self.debug:
            pprint(all_instances)
        logger.info(f"There is {len(all_instances)} total instances")
        return all_instances

    def ec2_start_instance(self, instance_id):
        """Starts an EC2 instance based on AWS starter code

        Args:
            instance_id (string): The value of the EC2 Instane ID
        """
        # Do a dryrun first to verify permissions
        try:
            self._ec2.start_instances(InstanceIds=[instance_id], DryRun=True)
        except ClientError as e:
            if "DryRunOperation" not in str(e):
                raise

        # Dry run succeeded, run start_instances without dryrun
        try:
            response = self._ec2.start_instances(InstanceIds=[instance_id], DryRun=False)
            logger.info(f"RESPONSE: {response}")
        except ClientError as e:
            logger.error(f"ERROR: {e}")
            sys.exit(1)

    def eks_function(self):
        """Configures and returns information about EKS Cluster

        EKS could be in States:
          1) Does not exist
          2) Exists but without LB (rarity)
          3) Fully exists
        """
        cluster = self.eks_get_cluster()
        if cluster:
            # setup pretty table
            fields = ["Key", "Value"]
            alignments = {"Key": "l", "Value": "l"}
            pt = hf.pretty_table_create(fields, **alignments)
            # get data
            lb_tag_resp, lb_tag_dict = self.get_elb_by_tag()
            lb, lb_dns = self.elb_get_lb(arn=None, name=lb_tag_dict.get("name"))
            self.dns_upsert_record(source="eks.linecas.com", dest=lb_dns)
            # add info to table
            pt.add_row(["eks_name", cluster.get("cluster").get("name")])
            pt.add_row(["eks_version", cluster.get("cluster").get("version")])
            pt.add_row(["eks_endpoint", cluster.get("cluster").get("endpoint")])
            pt.add_row(["ingress_lb_name", lb_tag_dict.get("name")])
            pt.add_row(["ingress_lb_dns", lb_dns])
            hf.pretty_table_print(pt, sort_key="Key")
        else:
            return

    def eks_get_cluster(self, cluster="linecas-eks"):
        """Gets details on EKS Cluster

        Args:
            cluster (string): name of the EKS cluster

        Returns:
            (dict): response of cluster details otherwise returns None
        """
        logger.info(f"Getting info on EKS Cluster: {cluster}")
        try:
            response = self._eks.describe_cluster(name=cluster)
        except self._eks.exceptions.ResourceNotFoundException:
            logger.warning(f"EKS Cluster: {cluster} does not exist")
            response = None
        if self.debug and response:
            pprint(response)
        return response

    def elb_aggregate(self):
        """Aggregates Load Balancer and Listener configurations

        Returns:
            list: of dictionaries on lbs and listeners
        """
        all_lbs = []
        details = []
        lbs = self.elb_get_loadbalancers()
        for lb in lbs:
            i = {}
            i["lb_arn"] = lb.get("LoadBalancerArn")
            i["lb_dns"] = lb.get("DNSName")
            i["lb_name"] = lb.get("LoadBalancerName")
            i["lb_type"] = lb.get("Type")
            all_lbs.append(i)
        for lb in all_lbs:
            listeners = self.elb_get_listeners(lb["lb_arn"])
            for listener in listeners:
                j = {}
                j["listener_arn"] = listener.get("ListenerArn")
                j["listener_default_action"] = listener.get("DefaultActions")[0]
                j["listener_default_action_type"] = listener.get("DefaultActions")[0].get("Type")
                j["listener_port"] = listener.get("Port")
                j["listener_protocal"] = listener.get("Protocol")
                # combine the lb and listener dictionaries next
                merge = {**lb, **j}
                details.append(merge)
        if self.debug:
            logger.debug("Details on Load Balancers and Listeners")
            pprint(details)
        logger.info(f"There is {len(details)} listeners across {len(lbs)} load balancers")
        return details

    def elb_describe_rules(self, listener_arn):
        """Collects information on rules in LB Listener

        Args:
            listener_arn (string): Unique ARN ID of LB Listener

        Returns:
            list: of dictionaries on listeners
        """
        response = self._elb.describe_rules(ListenerArn=listener_arn)
        if self.debug:
            pprint(response)
        logger.info(f"There is {len(response.get('Rules'))} rules(s) in search query")
        return response.get("Rules")

    def elb_get_listeners(self, lb_arn):
        """Collects information on listeners in LB

        Args:
            lb_arn (string): Unique ARN ID of LB

        Returns:
            list: of dictionaries on listeners
        """
        response = self._elb.describe_listeners(LoadBalancerArn=lb_arn)
        if self.debug:
            logger.debug("get response:")
            pprint(response.get("Listeners"))
        logger.info(f"There is {len(response.get('Listeners'))} listener(s) in search query")
        return response.get("Listeners")

    def elb_get_loadbalancers(self, lb_name=None):
        """Collects information on all load balancers

        Args:
            lb_name (string): optional field to get single LB

        Returns:
            list: of dictionaries on loadbalancers
        """
        if lb_name:
            response = self._elb.describe_load_balancers(Names=[lb_name])
        else:
            response = self._elb.describe_load_balancers()
        if self.debug:
            pprint(response.get("LoadBalancers"))
        logger.info(f"There is {len(response.get('LoadBalancers'))} loadbalancer(s) in search query")
        return response.get("LoadBalancers")

    def elb_modify_listener(
        self, listener_arn, action, fixed_message=None, forward_target_arn=None, redirect_port="80", redirect_url=None
    ):
        """Modifies the default action of a listener

        Args:
            listener_arn (string): ID of LB Listener
            action (string): Must be one of redirect/fixed/forward
            fixed_message (string): if action=fixed, the message to display
            forward_target_arn (string): if action=forward, the ASG target
            redirect_port (string): if action=redirect with string of destination port
            redirect_url (string): if action=redirect the url to 302 redirect to
        """
        if action == "redirect":
            if not redirect_url:
                logger.error(f"redirect_url is: {redirect_url}")
                sys.exit(1)
            if str(redirect_port) == "443":
                protocol = "HTTPS"
            else:
                protocol = "HTTP"
            response = self._elb.modify_listener(
                ListenerArn=listener_arn,
                DefaultActions=[
                    {
                        "Type": "redirect",
                        "RedirectConfig": {
                            "Protocol": protocol,
                            "Port": str(redirect_port),
                            "Host": redirect_url,
                            "Path": "/",
                            "Query": "",
                            "StatusCode": "HTTP_302",
                        },
                    },
                ],
            )
        elif action == "fixed":
            if not fixed_message:
                logger.error(f"fixed_message is: {fixed_message}")
                sys.exit(1)
            response = self._elb.modify_listener(
                ListenerArn=listener_arn,
                DefaultActions=[
                    {
                        "Type": "fixed-response",
                        "FixedResponseConfig": {
                            "MessageBody": fixed_message,
                            "StatusCode": "200",
                            "ContentType": "text/html",
                        },
                    }
                ],
            )
        elif action == "forward":
            if not forward_target_arn:
                logger.error(f"forward_target_arn is: {forward_target_arn}")
                sys.exit(1)
            response = self._elb.modify_listener(
                ListenerArn=listener_arn,
                DefaultActions=[
                    {
                        "Type": "forward",
                        "ForwardConfig": {
                            "TargetGroups": [
                                {"TargetGroupArn": forward_target_arn, "Weight": 1},
                            ],
                            "TargetGroupStickinessConfig": {"Enabled": False},
                        },
                    }
                ],
            )
        else:
            logger.error("This function should have never gotten here")
            sys.exit(1)
        if self.debug:
            pprint(response)
        response_code = response.get("ResponseMetadata").get("HTTPStatusCode")
        if response_code != 200:
            logger.error(f"response_code returned {response_code}")
            sys.exit(1)
        else:
            logger.info(f"Updating listener returned status code: {response_code}")

    def pprint_auto_scaling_groups(self):
        """Displays pretty table of Autoscaling Groups"""
        fields = ["Name", "Current Size", "Min Size", "Max Size", "Target"]
        alignments = {"Name": "l"}
        pt = hf.pretty_table_create(fields, **alignments)
        asgs = self.autoscaling_filter_groups()
        for i in asgs:
            pt.add_row(
                [
                    i["asg_name"],
                    i["asg_size_current"],
                    i["asg_size_min"],
                    i["asg_size_max"],
                    i["asg_target_group"],
                ]
            )
        hf.pretty_table_print(pt, sort_key="Name")

    def pprint_dns_records(self):
        """Displays tables on A and CNAME records"""
        fields = ["Type", "Name", "Destination"]
        alignments = {"Name": "l", "Destination": "l"}
        # NOTE: `ar` is "A records", `cr` is "CNAME records", `lr` is "Alias records"
        ar = hf.pretty_table_create(fields, **alignments)
        cr = hf.pretty_table_create(fields, **alignments)
        lr = hf.pretty_table_create(fields, **alignments)

        response = self.dns_zone_records()
        for i in response["ResourceRecordSets"]:
            if i["Type"] == "A" and i.get("ResourceRecords"):
                ar.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
            if i["Type"] == "CNAME":
                cr.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
            # print alias type records
            if i["Type"] == "A" and i.get("AliasTarget"):
                lr.add_row([i["Type"], i["Name"], i["AliasTarget"]["DNSName"]])

        hf.pretty_table_print(cr, sort_key="Name")
        hf.pretty_table_print(ar, sort_key="Name")
        hf.pretty_table_print(lr, sort_key="Name")

    def pprint_ec2_instances(self):
        """Displays pretty table of EC2 Instances"""
        fields = ["Group", "Name", "ID", "State", "Type", "Internal IP", "Public IP", "IPv6", "Image", "AZ"]
        alignments = {"Group": "l", "Name": "l"}
        pt = hf.pretty_table_create(fields, **alignments)
        instances = self.ec2_get_instances()
        for i in instances:
            pt.add_row(
                [
                    i["group_name"],
                    i["instance_name"],
                    i["instance_id"],
                    i["instance_state"],
                    i["instance_type"],
                    i["instance_private_ip"],
                    i["instance_public_ip"],
                    i["ipv6"],
                    i["image_name"],
                    i["instance_az"],
                ]
            )
        hf.pretty_table_print(pt, sort_key="Name")

    def pprint_load_balancers(self):
        """Displays pretty table of Elastic Load Balancers"""
        fields = ["Name", "Type", "DNS", "Protocol", "Port", "Default Action"]
        alignments = {"Name": "l"}
        pt = hf.pretty_table_create(fields, **alignments)
        details = self.elb_aggregate()
        for i in details:
            pt.add_row(
                [
                    i["lb_name"],
                    i["lb_type"],
                    i["lb_dns"],
                    i["listener_protocal"],
                    i["listener_port"],
                    i["listener_default_action_type"],
                ]
            )
        hf.pretty_table_print(pt, sort_key="Name")

    def tag_get_value(self, tags, label, default_value=None):
        """Gets value of specific key from list of tags

        Args:
            tags (list): list of key-value pairs
            label (string): the key of the value you want
            default_value (string): if you need to return a default string if no tag present

        Returns:
            string: of value if successful, otherwise return default_value
        """
        if tags:
            for tag in tags:
                if tag["Key"] == label:
                    return tag["Value"]
        return default_value

    def get_elb_by_tag(self, key="Name", value="tag-linecas-eks-ingress-nginx"):
        """Used to get a specific ELB based upon key-value pairs

        Args:
            key (string): which key to get
            value (string): the value of the key

        Returns:
            dict: the full response returned by AWS
            dict: a subset of specific items we desire
        """
        lb = {}
        response = self._tags.get_resources(
            # PaginationToken=token,
            TagFilters=[{"Key": key, "Values": [value]}],
            ResourcesPerPage=50,
            ResourceTypeFilters=[
                "elasticloadbalancing:loadbalancer",
            ],
        )
        if self.debug:
            pprint(response)
        # TODO: analyze for loop
        for i in response.get("ResourceTagMappingList"):
            lb["arn"] = i["ResourceARN"]
            lb["name"] = i["ResourceARN"].rsplit("/", 1)[-1]
        logger.info(f"LB: {lb}")
        return response, lb

    def elb_get_lb(self, arn=None, name="ad5e2189ea4a34164901c67732d0657c"):
        """Get details on LB based on ARN or Name

        Args:
            arn (string): Unique ARN string
            name (string): Name of the LB

        Returns:
            dict: Detailed response of LB
            string: The DNS record value
        """
        dns_name = None
        if arn:
            # NOTE: was having problems with ARN, need to evaluate further
            logger.info(f"Using LB ARN: {arn}")
            response = self._elb_v1.describe_load_balancers(LoadBalancerArns=[arn])
        else:
            logger.info(f"Using LB Name: {name}")
            response = self._elb_v1.describe_load_balancers(LoadBalancerNames=[name])
        if self.debug:
            pprint(response)
        for i in response.get("LoadBalancerDescriptions"):
            dns_name = i["DNSName"]
            logger.info(f"DNS Name: {dns_name}")
        return response, dns_name

    def get_billing_amount(self, start_date="2021-01-01", end_date="2021-02-01"):
        """Calls cost_and_usage API to get billing information, and adds to Pretty Table

        Args:
            start_date (string): In format "2021-01-01"
            end_date (string): In format "2021-02-01"

        Returns:
            float: of bill amount (though this is not actively used)
        """
        response = self._ce.get_cost_and_usage(
            TimePeriod={"Start": start_date, "End": end_date},
            Granularity="MONTHLY",
            Metrics=["BlendedCost"],
            GroupBy=[
                {"Type": "TAG", "Key": "Project"},
            ],
        )
        if self.debug:
            pprint(response)
        logger.debug(f"get_cost_and_usage response: {response}")

        amount = response.get("ResultsByTime")[0].get("Groups")[0].get("Metrics").get("BlendedCost").get("Amount")
        bill_amount = round(Decimal(amount), 2)
        logger.info(f"BILL_AMOUNT: {bill_amount}, START_DATE={start_date}, END_DATE={end_date}")
        self.bt.add_row([start_date, end_date, bill_amount])
        return bill_amount

    def get_billing_month(self, delta):
        """Gets relative billing date

        Args:
            delta (int): Relative month time period to current date

        Returns:
            string: In format YYYY-MM-01
        """
        logger.debug(f"MONTH_DELTA: {delta}")
        bill_date = date.today() + relativedelta(months=delta)
        return_date = bill_date.strftime("%Y-%m-01")
        logger.debug(f"RETURN_DATE: {return_date}")
        return return_date

    def billing_function(self):
        """Gets current and last 3 bill amounts"""
        month_plus_1 = self.get_billing_month(+1)
        month_current = self.get_billing_month(0)
        month_minus_1 = self.get_billing_month(-1)
        month_minus_2 = self.get_billing_month(-2)
        month_minus_3 = self.get_billing_month(-3)

        # Billing Table
        fields = ["Start Date", "End Date", "Bill Amount"]
        self.bt = hf.pretty_table_create(fields)

        self.get_billing_amount(month_current, month_plus_1)
        self.get_billing_amount(month_minus_1, month_current)
        self.get_billing_amount(month_minus_2, month_minus_1)
        self.get_billing_amount(month_minus_3, month_minus_2)

        hf.pretty_table_print(self.bt)

    def get_tag_value(self, tags, label):
        """Gets value of specific key from list of tags

        Args:
            tags (list): list of key-value pairs
            label (string): the key of the value you want

        Returns:
            string: of value if successful, None otherwise
        """
        for tag in tags:
            if tag["Key"] == label:
                return tag["Value"]
        # Default return
        return None

__init__(self, domain='linecas.com', debug=False) special

Sets up the clients and DNS information for interaction with AWS

Parameters:

Name Type Description Default
domain string

DNS zone to use. If None, default is linecas.com

'linecas.com'
debug bool

Used for pprint and log debugging

False
Source code in linecaspy/aws/aws.py
def __init__(self, domain="linecas.com", debug=False):
    """Sets up the clients and DNS information for interaction with AWS

    Args:
        domain (string): DNS zone to use.  If None, default is linecas.com
        debug (bool): Used for pprint and log debugging
    """
    self.debug = debug
    self.ami_image_cache = []
    logger.info(f"Intializing the AWS Class using Domain: {domain} with debug: {debug}")
    # Clients
    self._as = boto3.client("autoscaling")
    self._ce = boto3.client("ce")
    self._ec2 = boto3.client("ec2")
    self._eks = boto3.client("eks")
    self._elb_v1 = boto3.client("elb")  # NOTE: not elbv2
    self._elb = boto3.client("elbv2")  # NOTE: not elb
    self._r53 = boto3.client("route53")
    self._tags = boto3.client("resourcegroupstaggingapi")
    # DNS
    self.dns_zone_name = domain
    self.dns_zone_id = None
    self.dns_ttl = 90

ami_get_friendly_name(self, ami_id, words=3)

Gets a shortened name of Image used

Parameters:

Name Type Description Default
ami_id string

ID of the AMI

required
words int

of the first X amount of strings in name

3

Returns:

Type Description
string

a human comprehensible name

Source code in linecaspy/aws/aws.py
def ami_get_friendly_name(self, ami_id, words=3):
    """Gets a shortened name of Image used

    Args:
        ami_id (string): ID of the AMI
        words (int): of the first X amount of strings in name

    Returns:
        string: a human comprehensible name
    """
    # first check if we already looked up this AMI ID
    for ami in self.ami_image_cache:
        for key, value in ami.items():
            if key == ami_id:
                friendly_name = value
                logger.debug(f"Found {key} with {value}")
                return friendly_name
    # if not found, then we'll make an API call to discover
    response = self._ec2.describe_images(ImageIds=[ami_id])
    logger.debug(f"RESPONSE: {response}")
    friendly_name = None
    images = response.get("Images")
    for image in images:
        image_name = image.get("Name")
        logger.debug(f"IMAGE_NAME: {image_name}")
        friendly_name = " ".join(image_name.split()[: int(words)])
        # and add that discovery to the image cache
        image_dict = {ami_id: friendly_name}
        self.ami_image_cache.append(image_dict)
    return friendly_name

autoscaling_filter_groups(self)

Filters on specific subset of information in autoscaling groups

Returns:

Type Description
list

of dictionaries on autoscaling groups

Source code in linecaspy/aws/aws.py
def autoscaling_filter_groups(self):
    """Filters on specific subset of information in autoscaling groups

    Returns:
        list: of dictionaries on autoscaling groups
    """
    all_asgs = []
    asgs = self.autoscaling_get_groups()
    for asg in asgs:
        i = {}
        i["asg_arn"] = asg.get("AutoScalingGroupARN")
        i["asg_name"] = asg.get("AutoScalingGroupName")
        i["asg_size_current"] = len(asg.get("Instances"))
        i["asg_size_max"] = asg.get("MaxSize")
        i["asg_size_min"] = asg.get("MinSize")
        try:
            # NOTE: assumes ASG will have single target
            i["asg_target_group"] = asg.get("TargetGroupARNs")[0]
        except IndexError:
            i["asg_target_group"] = None
        all_asgs.append(i)
    if self.debug:
        logger.debug("filtered response:")
        pprint(all_asgs)
    return all_asgs

autoscaling_get_groups(self, asg_name=None)

Collects information on all autoscaling groups

Parameters:

Name Type Description Default
asg_name string

name of specific Autoscaling Group

None

Returns:

Type Description
list

of dictionaries on autoscaling groups

Source code in linecaspy/aws/aws.py
def autoscaling_get_groups(self, asg_name=None):
    """Collects information on all autoscaling groups

    Args:
        asg_name (string): name of specific Autoscaling Group

    Returns:
        list: of dictionaries on autoscaling groups
    """
    if asg_name:
        response = self._as.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name])
    else:
        response = self._as.describe_auto_scaling_groups()
    if self.debug:
        logger.debug("get response:")
        pprint(response.get("AutoScalingGroups"))
    logger.info(f"There is {len(response.get('AutoScalingGroups'))} Autoscaling-groups(s) in search query")
    return response.get("AutoScalingGroups")

billing_function(self)

Gets current and last 3 bill amounts

Source code in linecaspy/aws/aws.py
def billing_function(self):
    """Gets current and last 3 bill amounts"""
    month_plus_1 = self.get_billing_month(+1)
    month_current = self.get_billing_month(0)
    month_minus_1 = self.get_billing_month(-1)
    month_minus_2 = self.get_billing_month(-2)
    month_minus_3 = self.get_billing_month(-3)

    # Billing Table
    fields = ["Start Date", "End Date", "Bill Amount"]
    self.bt = hf.pretty_table_create(fields)

    self.get_billing_amount(month_current, month_plus_1)
    self.get_billing_amount(month_minus_1, month_current)
    self.get_billing_amount(month_minus_2, month_minus_1)
    self.get_billing_amount(month_minus_3, month_minus_2)

    hf.pretty_table_print(self.bt)

dns_delete_record_function(self, name)

Handler that calls the "dns_zone_records" and "dns_delete_records" functions

Parameters:

Name Type Description Default
name string

name of DNS record, will be expanded to FQDN

required
Source code in linecaspy/aws/aws.py
def dns_delete_record_function(self, name):
    """Handler that calls the "dns_zone_records" and "dns_delete_records" functions

    Args:
        name (string): name of DNS record, will be expanded to FQDN
    """
    fqdn = f"{name}.{self.dns_zone_name}."
    logger.info(f"Looking for record with name: {fqdn}")
    response = self.dns_zone_records()
    for i in response["ResourceRecordSets"]:
        if i["Name"] == fqdn:
            logger.info(f"Deleting DNS Record: {i}")
            d_name = i["Name"]
            d_type = i["Type"]
            d_ttl = i["TTL"]
            d_rr = i["ResourceRecords"]
            self.dns_delete_records(d_name, d_type, d_ttl, d_rr)

dns_delete_records(self, name, type_r, ttl, resources)

Performs the deletion of DNS record

Parameters:

Name Type Description Default
name string

Name of record

required
type_r string

Type of record like A or CNAME

required
ttl string

Time To Live value

required
resources list

Destination ResourceRecords

required
Source code in linecaspy/aws/aws.py
def dns_delete_records(self, name, type_r, ttl, resources):
    """Performs the deletion of DNS record

    Args:
        name (string): Name of record
        type_r (string): Type of record like A or CNAME
        ttl (string): Time To Live value
        resources (list): Destination ResourceRecords
    """
    logger.info(f"Received deletion request for {name}")
    response = self._r53.change_resource_record_sets(
        ChangeBatch={
            "Changes": [
                {
                    "Action": "DELETE",
                    "ResourceRecordSet": {
                        "Name": name,
                        "ResourceRecords": resources,
                        "Type": type_r,
                        "TTL": ttl,
                    },
                },
            ]
        },
        HostedZoneId=self.dns_zone_id,
    )
    if self.debug:
        pprint(response)
    logger.info(f"RESPONSE: {response}")

dns_upsert_record(self, source, dest, r_type='CNAME')

Creates or updates DNS record

Parameters:

Name Type Description Default
source string

DNS value you want to map

required
dest string

DNS value you want to map to

required
r_type string

DNS record type, typically 'A' or 'CNAME'

'CNAME'

Returns:

Type Description
dict

response of DNS event

Source code in linecaspy/aws/aws.py
def dns_upsert_record(self, source, dest, r_type="CNAME"):
    """Creates or updates DNS record

    Args:
        source (string): DNS value you want to map
        dest (string): DNS value you want to map to
        r_type (string): DNS record type, typically 'A' or 'CNAME'

    Returns:
        dict: response of DNS event
    """
    if self.dns_zone_id is None:
        self.dns_zone_id = self.dns_zones(self.dns_zone_name)
    response = self._r53.change_resource_record_sets(
        HostedZoneId=self.dns_zone_id,
        ChangeBatch={
            "Comment": f"add {source} -> {dest}",
            "Changes": [
                {
                    "Action": "UPSERT",
                    "ResourceRecordSet": {
                        "Name": source,
                        "Type": r_type,
                        "TTL": self.dns_ttl,
                        "ResourceRecords": [{"Value": dest}],
                    },
                }
            ],
        },
    )
    if self.debug:
        pprint(response)
    logger.info(f"RESPONSE: {response}")
    return response

dns_zone_records(self)

Gets all DNS records in Zone

Returns:

Type Description
(dict)

of DNS Records

Source code in linecaspy/aws/aws.py
def dns_zone_records(self):
    """Gets all DNS records in Zone

    Returns:
        (dict): of DNS Records
    """
    if self.dns_zone_id is None:
        self.dns_zone_id = self.dns_zones(self.dns_zone_name)
    response = self._r53.list_resource_record_sets(HostedZoneId=self.dns_zone_id)
    if self.debug:
        pprint(response)
    return response

dns_zones(self, domain)

Logs information and returns information on DNS Zones

Parameters:

Name Type Description Default
domain string

The DNS zone name, like "linecas.com"

required

Returns:

Type Description
(string)

The DNS Zone ID

Source code in linecaspy/aws/aws.py
def dns_zones(self, domain):
    """Logs information and returns information on DNS Zones

    Args:
        domain (string): The DNS zone name, like "linecas.com"

    Returns:
        (string): The DNS Zone ID
    """
    zones = self._r53.list_hosted_zones()
    if self.debug:
        pprint(zones)
    for i in zones["HostedZones"]:
        logger.info(f"{i['Name']} has {i['ResourceRecordSetCount']} records")
        if i["Name"] == f"{domain}.":
            zone_id = i["Id"]
    logger.info(f"Domain Name: {domain} has Zone ID: {zone_id}")
    self.dns_zone_id = zone_id
    return zone_id

ec2_get_instances(self)

Collects information on all ec2 instances

Returns:

Type Description
list

of dictionaries on instances

Source code in linecaspy/aws/aws.py
def ec2_get_instances(self):
    """Collects information on all ec2 instances

    Returns:
        list: of dictionaries on instances
    """
    all_instances = []
    response = self._ec2.describe_instances()
    for reservation in response["Reservations"]:
        for instance in reservation["Instances"]:
            if self.debug:
                pprint(instance)
            i = {}
            i["instance_tags"] = instance.get("Tags")
            i["group_name"] = self.tag_get_value(i["instance_tags"], label="ansible_group", default_value="N/A")
            i["instance_name"] = self.tag_get_value(i["instance_tags"], label="Name", default_value="N/A")
            i["instance_id"] = instance.get("InstanceId")
            i["instance_state"] = instance.get("State").get("Name")
            i["instance_type"] = instance.get("InstanceType")
            i["instance_private_ip"] = instance.get("PrivateIpAddress")
            i["instance_public_ip"] = instance.get("PublicIpAddress", None)
            try:
                i["ipv6"] = instance.get("NetworkInterfaces")[0].get("Ipv6Addresses")[0].get("Ipv6Address")
            except IndexError:
                i["ipv6"] = None
            i["image_id"] = instance.get("ImageId")
            i["image_name"] = self.ami_get_friendly_name(i["image_id"])
            i["instance_az"] = instance.get("Placement").get("AvailabilityZone")
            all_instances.append(i)
    if self.debug:
        pprint(all_instances)
    logger.info(f"There is {len(all_instances)} total instances")
    return all_instances

ec2_start_instance(self, instance_id)

Starts an EC2 instance based on AWS starter code

Parameters:

Name Type Description Default
instance_id string

The value of the EC2 Instane ID

required
Source code in linecaspy/aws/aws.py
def ec2_start_instance(self, instance_id):
    """Starts an EC2 instance based on AWS starter code

    Args:
        instance_id (string): The value of the EC2 Instane ID
    """
    # Do a dryrun first to verify permissions
    try:
        self._ec2.start_instances(InstanceIds=[instance_id], DryRun=True)
    except ClientError as e:
        if "DryRunOperation" not in str(e):
            raise

    # Dry run succeeded, run start_instances without dryrun
    try:
        response = self._ec2.start_instances(InstanceIds=[instance_id], DryRun=False)
        logger.info(f"RESPONSE: {response}")
    except ClientError as e:
        logger.error(f"ERROR: {e}")
        sys.exit(1)

eks_function(self)

Configures and returns information about EKS Cluster

EKS could be in States: 1) Does not exist 2) Exists but without LB (rarity) 3) Fully exists

Source code in linecaspy/aws/aws.py
def eks_function(self):
    """Configures and returns information about EKS Cluster

    EKS could be in States:
      1) Does not exist
      2) Exists but without LB (rarity)
      3) Fully exists
    """
    cluster = self.eks_get_cluster()
    if cluster:
        # setup pretty table
        fields = ["Key", "Value"]
        alignments = {"Key": "l", "Value": "l"}
        pt = hf.pretty_table_create(fields, **alignments)
        # get data
        lb_tag_resp, lb_tag_dict = self.get_elb_by_tag()
        lb, lb_dns = self.elb_get_lb(arn=None, name=lb_tag_dict.get("name"))
        self.dns_upsert_record(source="eks.linecas.com", dest=lb_dns)
        # add info to table
        pt.add_row(["eks_name", cluster.get("cluster").get("name")])
        pt.add_row(["eks_version", cluster.get("cluster").get("version")])
        pt.add_row(["eks_endpoint", cluster.get("cluster").get("endpoint")])
        pt.add_row(["ingress_lb_name", lb_tag_dict.get("name")])
        pt.add_row(["ingress_lb_dns", lb_dns])
        hf.pretty_table_print(pt, sort_key="Key")
    else:
        return

eks_get_cluster(self, cluster='linecas-eks')

Gets details on EKS Cluster

Parameters:

Name Type Description Default
cluster string

name of the EKS cluster

'linecas-eks'

Returns:

Type Description
(dict)

response of cluster details otherwise returns None

Source code in linecaspy/aws/aws.py
def eks_get_cluster(self, cluster="linecas-eks"):
    """Gets details on EKS Cluster

    Args:
        cluster (string): name of the EKS cluster

    Returns:
        (dict): response of cluster details otherwise returns None
    """
    logger.info(f"Getting info on EKS Cluster: {cluster}")
    try:
        response = self._eks.describe_cluster(name=cluster)
    except self._eks.exceptions.ResourceNotFoundException:
        logger.warning(f"EKS Cluster: {cluster} does not exist")
        response = None
    if self.debug and response:
        pprint(response)
    return response

elb_aggregate(self)

Aggregates Load Balancer and Listener configurations

Returns:

Type Description
list

of dictionaries on lbs and listeners

Source code in linecaspy/aws/aws.py
def elb_aggregate(self):
    """Aggregates Load Balancer and Listener configurations

    Returns:
        list: of dictionaries on lbs and listeners
    """
    all_lbs = []
    details = []
    lbs = self.elb_get_loadbalancers()
    for lb in lbs:
        i = {}
        i["lb_arn"] = lb.get("LoadBalancerArn")
        i["lb_dns"] = lb.get("DNSName")
        i["lb_name"] = lb.get("LoadBalancerName")
        i["lb_type"] = lb.get("Type")
        all_lbs.append(i)
    for lb in all_lbs:
        listeners = self.elb_get_listeners(lb["lb_arn"])
        for listener in listeners:
            j = {}
            j["listener_arn"] = listener.get("ListenerArn")
            j["listener_default_action"] = listener.get("DefaultActions")[0]
            j["listener_default_action_type"] = listener.get("DefaultActions")[0].get("Type")
            j["listener_port"] = listener.get("Port")
            j["listener_protocal"] = listener.get("Protocol")
            # combine the lb and listener dictionaries next
            merge = {**lb, **j}
            details.append(merge)
    if self.debug:
        logger.debug("Details on Load Balancers and Listeners")
        pprint(details)
    logger.info(f"There is {len(details)} listeners across {len(lbs)} load balancers")
    return details

elb_describe_rules(self, listener_arn)

Collects information on rules in LB Listener

Parameters:

Name Type Description Default
listener_arn string

Unique ARN ID of LB Listener

required

Returns:

Type Description
list

of dictionaries on listeners

Source code in linecaspy/aws/aws.py
def elb_describe_rules(self, listener_arn):
    """Collects information on rules in LB Listener

    Args:
        listener_arn (string): Unique ARN ID of LB Listener

    Returns:
        list: of dictionaries on listeners
    """
    response = self._elb.describe_rules(ListenerArn=listener_arn)
    if self.debug:
        pprint(response)
    logger.info(f"There is {len(response.get('Rules'))} rules(s) in search query")
    return response.get("Rules")

elb_get_lb(self, arn=None, name='ad5e2189ea4a34164901c67732d0657c')

Get details on LB based on ARN or Name

Parameters:

Name Type Description Default
arn string

Unique ARN string

None
name string

Name of the LB

'ad5e2189ea4a34164901c67732d0657c'

Returns:

Type Description
dict

Detailed response of LB string: The DNS record value

Source code in linecaspy/aws/aws.py
def elb_get_lb(self, arn=None, name="ad5e2189ea4a34164901c67732d0657c"):
    """Get details on LB based on ARN or Name

    Args:
        arn (string): Unique ARN string
        name (string): Name of the LB

    Returns:
        dict: Detailed response of LB
        string: The DNS record value
    """
    dns_name = None
    if arn:
        # NOTE: was having problems with ARN, need to evaluate further
        logger.info(f"Using LB ARN: {arn}")
        response = self._elb_v1.describe_load_balancers(LoadBalancerArns=[arn])
    else:
        logger.info(f"Using LB Name: {name}")
        response = self._elb_v1.describe_load_balancers(LoadBalancerNames=[name])
    if self.debug:
        pprint(response)
    for i in response.get("LoadBalancerDescriptions"):
        dns_name = i["DNSName"]
        logger.info(f"DNS Name: {dns_name}")
    return response, dns_name

elb_get_listeners(self, lb_arn)

Collects information on listeners in LB

Parameters:

Name Type Description Default
lb_arn string

Unique ARN ID of LB

required

Returns:

Type Description
list

of dictionaries on listeners

Source code in linecaspy/aws/aws.py
def elb_get_listeners(self, lb_arn):
    """Collects information on listeners in LB

    Args:
        lb_arn (string): Unique ARN ID of LB

    Returns:
        list: of dictionaries on listeners
    """
    response = self._elb.describe_listeners(LoadBalancerArn=lb_arn)
    if self.debug:
        logger.debug("get response:")
        pprint(response.get("Listeners"))
    logger.info(f"There is {len(response.get('Listeners'))} listener(s) in search query")
    return response.get("Listeners")

elb_get_loadbalancers(self, lb_name=None)

Collects information on all load balancers

Parameters:

Name Type Description Default
lb_name string

optional field to get single LB

None

Returns:

Type Description
list

of dictionaries on loadbalancers

Source code in linecaspy/aws/aws.py
def elb_get_loadbalancers(self, lb_name=None):
    """Collects information on all load balancers

    Args:
        lb_name (string): optional field to get single LB

    Returns:
        list: of dictionaries on loadbalancers
    """
    if lb_name:
        response = self._elb.describe_load_balancers(Names=[lb_name])
    else:
        response = self._elb.describe_load_balancers()
    if self.debug:
        pprint(response.get("LoadBalancers"))
    logger.info(f"There is {len(response.get('LoadBalancers'))} loadbalancer(s) in search query")
    return response.get("LoadBalancers")

elb_modify_listener(self, listener_arn, action, fixed_message=None, forward_target_arn=None, redirect_port='80', redirect_url=None)

Modifies the default action of a listener

Parameters:

Name Type Description Default
listener_arn string

ID of LB Listener

required
action string

Must be one of redirect/fixed/forward

required
fixed_message string

if action=fixed, the message to display

None
forward_target_arn string

if action=forward, the ASG target

None
redirect_port string

if action=redirect with string of destination port

'80'
redirect_url string

if action=redirect the url to 302 redirect to

None
Source code in linecaspy/aws/aws.py
def elb_modify_listener(
    self, listener_arn, action, fixed_message=None, forward_target_arn=None, redirect_port="80", redirect_url=None
):
    """Modifies the default action of a listener

    Args:
        listener_arn (string): ID of LB Listener
        action (string): Must be one of redirect/fixed/forward
        fixed_message (string): if action=fixed, the message to display
        forward_target_arn (string): if action=forward, the ASG target
        redirect_port (string): if action=redirect with string of destination port
        redirect_url (string): if action=redirect the url to 302 redirect to
    """
    if action == "redirect":
        if not redirect_url:
            logger.error(f"redirect_url is: {redirect_url}")
            sys.exit(1)
        if str(redirect_port) == "443":
            protocol = "HTTPS"
        else:
            protocol = "HTTP"
        response = self._elb.modify_listener(
            ListenerArn=listener_arn,
            DefaultActions=[
                {
                    "Type": "redirect",
                    "RedirectConfig": {
                        "Protocol": protocol,
                        "Port": str(redirect_port),
                        "Host": redirect_url,
                        "Path": "/",
                        "Query": "",
                        "StatusCode": "HTTP_302",
                    },
                },
            ],
        )
    elif action == "fixed":
        if not fixed_message:
            logger.error(f"fixed_message is: {fixed_message}")
            sys.exit(1)
        response = self._elb.modify_listener(
            ListenerArn=listener_arn,
            DefaultActions=[
                {
                    "Type": "fixed-response",
                    "FixedResponseConfig": {
                        "MessageBody": fixed_message,
                        "StatusCode": "200",
                        "ContentType": "text/html",
                    },
                }
            ],
        )
    elif action == "forward":
        if not forward_target_arn:
            logger.error(f"forward_target_arn is: {forward_target_arn}")
            sys.exit(1)
        response = self._elb.modify_listener(
            ListenerArn=listener_arn,
            DefaultActions=[
                {
                    "Type": "forward",
                    "ForwardConfig": {
                        "TargetGroups": [
                            {"TargetGroupArn": forward_target_arn, "Weight": 1},
                        ],
                        "TargetGroupStickinessConfig": {"Enabled": False},
                    },
                }
            ],
        )
    else:
        logger.error("This function should have never gotten here")
        sys.exit(1)
    if self.debug:
        pprint(response)
    response_code = response.get("ResponseMetadata").get("HTTPStatusCode")
    if response_code != 200:
        logger.error(f"response_code returned {response_code}")
        sys.exit(1)
    else:
        logger.info(f"Updating listener returned status code: {response_code}")

get_billing_amount(self, start_date='2021-01-01', end_date='2021-02-01')

Calls cost_and_usage API to get billing information, and adds to Pretty Table

Parameters:

Name Type Description Default
start_date string

In format "2021-01-01"

'2021-01-01'
end_date string

In format "2021-02-01"

'2021-02-01'

Returns:

Type Description
float

of bill amount (though this is not actively used)

Source code in linecaspy/aws/aws.py
def get_billing_amount(self, start_date="2021-01-01", end_date="2021-02-01"):
    """Calls cost_and_usage API to get billing information, and adds to Pretty Table

    Args:
        start_date (string): In format "2021-01-01"
        end_date (string): In format "2021-02-01"

    Returns:
        float: of bill amount (though this is not actively used)
    """
    response = self._ce.get_cost_and_usage(
        TimePeriod={"Start": start_date, "End": end_date},
        Granularity="MONTHLY",
        Metrics=["BlendedCost"],
        GroupBy=[
            {"Type": "TAG", "Key": "Project"},
        ],
    )
    if self.debug:
        pprint(response)
    logger.debug(f"get_cost_and_usage response: {response}")

    amount = response.get("ResultsByTime")[0].get("Groups")[0].get("Metrics").get("BlendedCost").get("Amount")
    bill_amount = round(Decimal(amount), 2)
    logger.info(f"BILL_AMOUNT: {bill_amount}, START_DATE={start_date}, END_DATE={end_date}")
    self.bt.add_row([start_date, end_date, bill_amount])
    return bill_amount

get_billing_month(self, delta)

Gets relative billing date

Parameters:

Name Type Description Default
delta int

Relative month time period to current date

required

Returns:

Type Description
string

In format YYYY-MM-01

Source code in linecaspy/aws/aws.py
def get_billing_month(self, delta):
    """Gets relative billing date

    Args:
        delta (int): Relative month time period to current date

    Returns:
        string: In format YYYY-MM-01
    """
    logger.debug(f"MONTH_DELTA: {delta}")
    bill_date = date.today() + relativedelta(months=delta)
    return_date = bill_date.strftime("%Y-%m-01")
    logger.debug(f"RETURN_DATE: {return_date}")
    return return_date

get_elb_by_tag(self, key='Name', value='tag-linecas-eks-ingress-nginx')

Used to get a specific ELB based upon key-value pairs

Parameters:

Name Type Description Default
key string

which key to get

'Name'
value string

the value of the key

'tag-linecas-eks-ingress-nginx'

Returns:

Type Description
dict

the full response returned by AWS dict: a subset of specific items we desire

Source code in linecaspy/aws/aws.py
def get_elb_by_tag(self, key="Name", value="tag-linecas-eks-ingress-nginx"):
    """Used to get a specific ELB based upon key-value pairs

    Args:
        key (string): which key to get
        value (string): the value of the key

    Returns:
        dict: the full response returned by AWS
        dict: a subset of specific items we desire
    """
    lb = {}
    response = self._tags.get_resources(
        # PaginationToken=token,
        TagFilters=[{"Key": key, "Values": [value]}],
        ResourcesPerPage=50,
        ResourceTypeFilters=[
            "elasticloadbalancing:loadbalancer",
        ],
    )
    if self.debug:
        pprint(response)
    # TODO: analyze for loop
    for i in response.get("ResourceTagMappingList"):
        lb["arn"] = i["ResourceARN"]
        lb["name"] = i["ResourceARN"].rsplit("/", 1)[-1]
    logger.info(f"LB: {lb}")
    return response, lb

get_tag_value(self, tags, label)

Gets value of specific key from list of tags

Parameters:

Name Type Description Default
tags list

list of key-value pairs

required
label string

the key of the value you want

required

Returns:

Type Description
string

of value if successful, None otherwise

Source code in linecaspy/aws/aws.py
def get_tag_value(self, tags, label):
    """Gets value of specific key from list of tags

    Args:
        tags (list): list of key-value pairs
        label (string): the key of the value you want

    Returns:
        string: of value if successful, None otherwise
    """
    for tag in tags:
        if tag["Key"] == label:
            return tag["Value"]
    # Default return
    return None

pprint_auto_scaling_groups(self)

Displays pretty table of Autoscaling Groups

Source code in linecaspy/aws/aws.py
def pprint_auto_scaling_groups(self):
    """Displays pretty table of Autoscaling Groups"""
    fields = ["Name", "Current Size", "Min Size", "Max Size", "Target"]
    alignments = {"Name": "l"}
    pt = hf.pretty_table_create(fields, **alignments)
    asgs = self.autoscaling_filter_groups()
    for i in asgs:
        pt.add_row(
            [
                i["asg_name"],
                i["asg_size_current"],
                i["asg_size_min"],
                i["asg_size_max"],
                i["asg_target_group"],
            ]
        )
    hf.pretty_table_print(pt, sort_key="Name")

pprint_dns_records(self)

Displays tables on A and CNAME records

Source code in linecaspy/aws/aws.py
def pprint_dns_records(self):
    """Displays tables on A and CNAME records"""
    fields = ["Type", "Name", "Destination"]
    alignments = {"Name": "l", "Destination": "l"}
    # NOTE: `ar` is "A records", `cr` is "CNAME records", `lr` is "Alias records"
    ar = hf.pretty_table_create(fields, **alignments)
    cr = hf.pretty_table_create(fields, **alignments)
    lr = hf.pretty_table_create(fields, **alignments)

    response = self.dns_zone_records()
    for i in response["ResourceRecordSets"]:
        if i["Type"] == "A" and i.get("ResourceRecords"):
            ar.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
        if i["Type"] == "CNAME":
            cr.add_row([i["Type"], i["Name"], i["ResourceRecords"][0]["Value"]])
        # print alias type records
        if i["Type"] == "A" and i.get("AliasTarget"):
            lr.add_row([i["Type"], i["Name"], i["AliasTarget"]["DNSName"]])

    hf.pretty_table_print(cr, sort_key="Name")
    hf.pretty_table_print(ar, sort_key="Name")
    hf.pretty_table_print(lr, sort_key="Name")

pprint_ec2_instances(self)

Displays pretty table of EC2 Instances

Source code in linecaspy/aws/aws.py
def pprint_ec2_instances(self):
    """Displays pretty table of EC2 Instances"""
    fields = ["Group", "Name", "ID", "State", "Type", "Internal IP", "Public IP", "IPv6", "Image", "AZ"]
    alignments = {"Group": "l", "Name": "l"}
    pt = hf.pretty_table_create(fields, **alignments)
    instances = self.ec2_get_instances()
    for i in instances:
        pt.add_row(
            [
                i["group_name"],
                i["instance_name"],
                i["instance_id"],
                i["instance_state"],
                i["instance_type"],
                i["instance_private_ip"],
                i["instance_public_ip"],
                i["ipv6"],
                i["image_name"],
                i["instance_az"],
            ]
        )
    hf.pretty_table_print(pt, sort_key="Name")

pprint_load_balancers(self)

Displays pretty table of Elastic Load Balancers

Source code in linecaspy/aws/aws.py
def pprint_load_balancers(self):
    """Displays pretty table of Elastic Load Balancers"""
    fields = ["Name", "Type", "DNS", "Protocol", "Port", "Default Action"]
    alignments = {"Name": "l"}
    pt = hf.pretty_table_create(fields, **alignments)
    details = self.elb_aggregate()
    for i in details:
        pt.add_row(
            [
                i["lb_name"],
                i["lb_type"],
                i["lb_dns"],
                i["listener_protocal"],
                i["listener_port"],
                i["listener_default_action_type"],
            ]
        )
    hf.pretty_table_print(pt, sort_key="Name")

tag_get_value(self, tags, label, default_value=None)

Gets value of specific key from list of tags

Parameters:

Name Type Description Default
tags list

list of key-value pairs

required
label string

the key of the value you want

required
default_value string

if you need to return a default string if no tag present

None

Returns:

Type Description
string

of value if successful, otherwise return default_value

Source code in linecaspy/aws/aws.py
def tag_get_value(self, tags, label, default_value=None):
    """Gets value of specific key from list of tags

    Args:
        tags (list): list of key-value pairs
        label (string): the key of the value you want
        default_value (string): if you need to return a default string if no tag present

    Returns:
        string: of value if successful, otherwise return default_value
    """
    if tags:
        for tag in tags:
            if tag["Key"] == label:
                return tag["Value"]
    return default_value