From 8b48716be81bec9f9f6fb291924be2267d2025d5 Mon Sep 17 00:00:00 2001 From: Cristian Magherusan-Stanciu Date: Thu, 31 Oct 2024 13:53:33 +0100 Subject: [PATCH] Initial code for parallel EC2 scraping It's much faster because we do a lot of things in parallel but for now still missing pricing data --- dedicated_price_collector.py | 178 ++++++++++++++++ logging_setup.py | 63 ++++++ parallel_pricing.py | 127 ++++++++++++ parallel_scraper.py | 166 +++++++++++++++ scrape.py | 383 +++++++++++++++++++++-------------- 5 files changed, 760 insertions(+), 157 deletions(-) create mode 100644 dedicated_price_collector.py create mode 100644 logging_setup.py create mode 100644 parallel_pricing.py create mode 100644 parallel_scraper.py diff --git a/dedicated_price_collector.py b/dedicated_price_collector.py new file mode 100644 index 0000000..6d51db3 --- /dev/null +++ b/dedicated_price_collector.py @@ -0,0 +1,178 @@ +import requests +import json +import gzip +import re +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, Any +import threading +from urllib.parse import quote + +class DedicatedPriceCollector: + def __init__(self, max_workers: int = 10): + self.max_workers = max_workers + self.logger = logging.getLogger('DedicatedPriceCollector') + self.lock = threading.Lock() + + def _fetch_data(self, url: str) -> Dict: + """Helper method to fetch and parse JSON data from URLs""" + try: + response = requests.get(url) + response.raise_for_status() # Raises an HTTPError for bad responses + content = response.content + + try: + content = content.decode() + except UnicodeDecodeError: + content = gzip.decompress(content).decode() + + try: + data = json.loads(content) + except ValueError: + # If the data isn't compatible JSON, try to parse as jsonP + json_string = re.search(r"callback\((.*)\);", content).groups()[0] + json_string = re.sub(r"(\w+):", r'"\1":', json_string) + data = json.loads(json_string) + + return data + + except Exception as e: + self.logger.error(f"Error fetching data from {url}: {str(e)}") + raise + + def _format_price(self, price: float) -> str: + """Helper method to format prices consistently""" + return str(float("%f" % float(price))).rstrip('0').rstrip('.') + + def _fetch_ondemand_prices(self) -> Dict[str, Any]: + """Fetch on-demand pricing data for dedicated hosts""" + url = "https://b0.p.awsstatic.com/pricing/2.0/meteredUnitMaps/ec2/USD/current/dedicatedhost-ondemand.json" + try: + od_pricing = self._fetch_data(url) + all_pricing = {} + + for region in od_pricing["regions"]: + all_pricing[region] = {} + for instance_description, dinst in od_pricing["regions"][region].items(): + _price = {"ondemand": self._format_price(dinst["price"]), "reserved": {}} + all_pricing[region][dinst["Instance Type"]] = _price + + return all_pricing + except Exception as e: + self.logger.error(f"Error fetching on-demand pricing: {str(e)}") + raise + + def _fetch_reserved_price(self, params: Dict) -> Dict: + """Fetch reserved pricing data for a specific region/term/payment combination""" + region = params['region'] + term = params['term'] + payment = params['payment'] + + try: + base = "https://b0.p.awsstatic.com/pricing/2.0/meteredUnitMaps/ec2/USD/current/dedicatedhost-reservedinstance-virtual/" + path = f"{quote(region)}/{quote(term)}/{quote(payment)}/index.json" + + pricing = self._fetch_data(base + path) + results = [] + + reserved_map = { + "1yrNoUpfront": "yrTerm1Standard.noUpfront", + "1yrPartialUpfront": "yrTerm1Standard.partialUpfront", + "1yrAllUpfront": "yrTerm1Standard.allUpfront", + "1 yrNoUpfront": "yrTerm1Standard.noUpfront", + "1 yrPartialUpfront": "yrTerm1Standard.partialUpfront", + "1 yrAllUpfront": "yrTerm1Standard.allUpfront", + "3yrNoUpfront": "yrTerm3Standard.noUpfront", + "3yrPartialUpfront": "yrTerm3Standard.partialUpfront", + "3yrAllUpfront": "yrTerm3Standard.allUpfront", + "3 yrNoUpfront": "yrTerm3Standard.noUpfront", + "3 yrPartialUpfront": "yrTerm3Standard.partialUpfront", + "3 yrAllUpfront": "yrTerm3Standard.allUpfront", + } + + for instance_description, dinst in pricing["regions"][region].items(): + upfront = 0.0 + if "Partial" in payment or "All" in payment: + upfront = float(dinst["riupfront:PricePerUnit"]) + + inst_type = dinst["Instance Type"] + ondemand = float(dinst["price"]) + lease_in_years = int(dinst["LeaseContractLength"][0]) + hours_in_term = lease_in_years * 365 * 24 + price = float(ondemand) + (float(upfront) / hours_in_term) + + translate_ri = reserved_map[ + dinst["LeaseContractLength"] + dinst["PurchaseOption"] + ] + + results.append({ + 'region': region, + 'instance_type': inst_type, + 'pricing_term': translate_ri, + 'price': self._format_price(price) + }) + + return results + + except Exception as e: + self.logger.warning( + f"Failed to fetch reserved pricing for region={region}, term={term}, " + f"payment={payment}: {str(e)}" + ) + return [] + + def _update_pricing_dict(self, all_pricing: Dict, result: Dict) -> None: + """Thread-safe update of the pricing dictionary""" + with self.lock: + region = result['region'] + inst_type = result['instance_type'] + pricing_term = result['pricing_term'] + price = result['price'] + + if inst_type not in all_pricing[region]: + all_pricing[region][inst_type] = {"reserved": {}} + + all_pricing[region][inst_type]["reserved"][pricing_term] = price + + def fetch_dedicated_prices(self) -> Dict: + """Fetch all dedicated host pricing data using parallel processing""" + self.logger.info("Starting dedicated price collection") + + # First fetch on-demand pricing + all_pricing = self._fetch_ondemand_prices() + + # Prepare parameters for reserved price fetching + fetch_params = [] + terms = ["3 year", "1 year"] + payments = ["No Upfront", "Partial Upfront", "All Upfront"] + + for region in all_pricing.keys(): + for term in terms: + for payment in payments: + fetch_params.append({ + 'region': region, + 'term': term, + 'payment': payment + }) + + # Fetch reserved pricing in parallel + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_params = { + executor.submit(self._fetch_reserved_price, params): params + for params in fetch_params + } + + for future in as_completed(future_to_params): + params = future_to_params[future] + try: + results = future.result() + for result in results: + self._update_pricing_dict(all_pricing, result) + except Exception as e: + self.logger.error( + f"Error processing results for region={params['region']}, " + f"term={params['term']}, payment={params['payment']}: {str(e)}" + ) + + self.logger.info("Completed dedicated price collection") + return all_pricing \ No newline at end of file diff --git a/logging_setup.py b/logging_setup.py new file mode 100644 index 0000000..7feeed7 --- /dev/null +++ b/logging_setup.py @@ -0,0 +1,63 @@ +import logging +import os +import sys +from datetime import datetime + +def setup_logging(log_level=logging.INFO): + """Configure logging for the AWS instance scraper""" + + # Create logs directory if it doesn't exist + if not os.path.exists('logs'): + os.makedirs('logs') + + # Setup logging to both file and console + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + log_filename = f'logs/scraper_{timestamp}.log' + + # Configure root logger + logger = logging.getLogger() + logger.setLevel(log_level) + + # File handler with detailed formatting + file_handler = logging.FileHandler(log_filename) + file_formatter = logging.Formatter( + '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s' + ) + file_handler.setFormatter(file_formatter) + logger.addHandler(file_handler) + + # Console handler with simpler formatting + console_handler = logging.StreamHandler(sys.stdout) + console_formatter = logging.Formatter( + '%(levelname)-8s: %(message)s' + ) + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) + + return logger + +def log_api_call(service, operation, params=None, response=None, error=None): + """Helper to log API calls consistently""" + logger = logging.getLogger(f'aws.{service}') + + if params is None: + params = {} + + msg = f"API Call: {operation}" + if error: + logger.error(f"{msg} failed: {error}") + logger.debug(f"Parameters: {params}") + else: + logger.debug(f"{msg} succeeded") + logger.debug(f"Parameters: {params}") + logger.debug(f"Response: {response}") + +def log_scraping_progress(module, stage, items_processed=None, total_items=None): + """Helper to log scraping progress consistently""" + logger = logging.getLogger(f'scraper.{module}') + + if items_processed is not None and total_items is not None: + progress = (items_processed / total_items) * 100 + logger.info(f"{stage}: Processed {items_processed}/{total_items} items ({progress:.1f}%)") + else: + logger.info(f"{stage}") \ No newline at end of file diff --git a/parallel_pricing.py b/parallel_pricing.py new file mode 100644 index 0000000..a3b9e6c --- /dev/null +++ b/parallel_pricing.py @@ -0,0 +1,127 @@ +import boto3 +import concurrent.futures +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, List, Any + +class ParallelPricingCollector: + def __init__(self, max_workers=10): + self.max_workers = max_workers + self.logger = logging.getLogger('scraper.PricingCollector') + + def _fetch_pricing_data_for_region(self, region: str, service_code: str, filters: List[Dict]) -> Dict: + """Fetch pricing data for a specific region""" + logger = logging.getLogger(f'scraper.pricing.{region}') + + try: + pricing_client = boto3.client('pricing', region_name='us-east-1') # Pricing API only available in us-east-1 + paginator = pricing_client.get_paginator('get_products') + + products = [] + for page in paginator.paginate( + ServiceCode=service_code, + Filters=[ + *filters, + {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region} + ] + ): + products.extend(page['PriceList']) + + return {'region': region, 'products': products} + + except Exception as e: + logger.error(f"Error fetching pricing data for region {region}: {str(e)}") + return {'region': region, 'products': [], 'error': str(e)} + + def _process_pricing_data(self, pricing_data: Dict, instance_map: Dict) -> None: + """Process pricing data for a region and update instance pricing information""" + try: + region = pricing_data['region'] + products = pricing_data['products'] + + for price_data in products: + # Extract relevant pricing info + product = price_data.get('product', {}) + attributes = product.get('attributes', {}) + + instance_type = attributes.get('instanceType') + if not instance_type or instance_type not in instance_map: + continue + + instance = instance_map[instance_type] + + # Initialize pricing structure if needed + instance.pricing.setdefault(region, {}) + + # Process on-demand pricing + terms = price_data.get('terms', {}) + ondemand_terms = terms.get('OnDemand', {}) + for term_data in ondemand_terms.values(): + price_dimensions = term_data.get('priceDimensions', {}) + for dimension in price_dimensions.values(): + if 'USD' in dimension.get('pricePerUnit', {}): + price = float(dimension['pricePerUnit']['USD']) + operating_system = attributes.get('operatingSystem', 'Linux') + preinstalled_software = attributes.get('preInstalledSw', 'NA') + + platform = self._translate_platform(operating_system, preinstalled_software) + instance.pricing[region].setdefault(platform, {}) + instance.pricing[region][platform]['ondemand'] = price + + except Exception as e: + self.logger.error(f"Error processing pricing data for region {region}: {str(e)}") + + def _translate_platform(self, operating_system: str, preinstalled_software: str) -> str: + """Translate AWS platform names to internal representation""" + os_map = { + 'Linux': 'linux', + 'RHEL': 'rhel', + 'SUSE': 'sles', + 'Windows': 'mswin', + 'Linux/UNIX': 'linux' + } + + software_map = { + 'NA': '', + 'SQL Std': 'SQL', + 'SQL Web': 'SQLWeb', + 'SQL Ent': 'SQLEnterprise' + } + + os_key = os_map.get(operating_system, 'linux') + software_key = software_map.get(preinstalled_software, '') + return os_key + software_key + + def collect_pricing(self, instances: List[Any], regions: List[str]) -> None: + """Collect pricing data in parallel for all instances""" + instance_map = {i.instance_type: i for i in instances} + + filters = [ + {'Type': 'TERM_MATCH', 'Field': 'capacityStatus', 'Value': 'Used'}, + {'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'Shared'}, + {'Type': 'TERM_MATCH', 'Field': 'licenseModel', 'Value': 'No License required'} + ] + + # Initialize pricing dictionaries + for instance in instances: + instance.pricing = {} + + # Fetch pricing data in parallel for all regions + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_region = { + executor.submit( + self._fetch_pricing_data_for_region, + region, + 'AmazonEC2', + filters + ): region for region in regions + } + + for future in as_completed(future_to_region): + region = future_to_region[future] + try: + pricing_data = future.result() + self._process_pricing_data(pricing_data, instance_map) + self.logger.debug(f"Completed pricing collection for region {region}") + except Exception as e: + self.logger.error(f"Failed to collect pricing for region {region}: {str(e)}") \ No newline at end of file diff --git a/parallel_scraper.py b/parallel_scraper.py new file mode 100644 index 0000000..2a329a0 --- /dev/null +++ b/parallel_scraper.py @@ -0,0 +1,166 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed +import concurrent.futures +import boto3 +import requests +import json +from typing import Dict, List, Any +import ec2 +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, List + +class ParallelScraper: + def __init__(self, max_workers=10): + self.max_workers = max_workers + self.logger = logging.getLogger('scraper.ParallelScraper') + + def _fetch_instance_data(self, region: str) -> Dict: + """Fetch EC2 instance data for a specific region""" + try: + ec2_client = boto3.client('ec2', region_name=region) + paginator = ec2_client.get_paginator('describe_instance_types') + + instances = [] + for page in paginator.paginate(): + instances.extend(page['InstanceTypes']) + + self.logger.debug(f"Fetched {len(instances)} instances from {region}") + return {'region': region, 'instances': instances} + + except Exception as e: + self.logger.error(f"Error fetching instance data for region {region}: {str(e)}") + return {'region': region, 'instances': []} + + def _fetch_pricing_data(self, region: str, service_code: str, filters: List[Dict]) -> Dict: + """Fetch pricing data for a specific region and service""" + logger = logging.getLogger(f'scraper.pricing.{region}') + + try: + logger.debug(f"Starting pricing data fetch for {service_code}") + pricing_client = boto3.client('pricing', region_name='us-east-1') + paginator = pricing_client.get_paginator('get_products') + + products = [] + page_count = 0 + + for page in paginator.paginate(ServiceCode=service_code, Filters=filters): + products.extend(page['PriceList']) + page_count += 1 + logger.debug(f"Retrieved page {page_count} with {len(page['PriceList'])} products") + + log_api_call('pricing', 'get_products', + params={'ServiceCode': service_code, 'Filters': filters}, + response={'TotalProducts': len(products), 'Pages': page_count}) + + return {'region': region, 'data': products} + + except Exception as e: + log_api_call('pricing', 'get_products', + params={'ServiceCode': service_code, 'Filters': filters}, + error=str(e)) + logger.error(f"Error fetching pricing data: {str(e)}", exc_info=True) + return {'region': region, 'data': []} + + def parallel_pricing_fetch(self, regions: List[str], service_code: str, filters: List[Dict]) -> Dict[str, List]: + """Fetch pricing data for multiple regions in parallel""" + pricing_data = {} + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_region = { + executor.submit(self._fetch_pricing_data, region, service_code, filters): region + for region in regions + } + + for future in as_completed(future_to_region): + result = future.result() + pricing_data[result['region']] = result['data'] + + return pricing_data + + def _fetch_instance_data(self, region: str) -> Dict: + """Fetch EC2 instance data for a specific region""" + try: + ec2_client = boto3.client('ec2', region_name=region) + paginator = ec2_client.get_paginator('describe_instance_types') + + instances = [] + for page in paginator.paginate(): + instances.extend(page['InstanceTypes']) + + self.logger.debug(f"Fetched {len(instances)} instances from {region}") + return {'region': region, 'instances': instances} + + except Exception as e: + self.logger.error(f"Error fetching instance data for region {region}: {str(e)}") + return {'region': region, 'instances': []} + + def parallel_instance_fetch(self, regions: List[str]) -> Dict[str, List]: + """Fetch instance data for multiple regions in parallel""" + instance_data = {} + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_region = { + executor.submit(self._fetch_instance_data, region): region + for region in regions + } + + for future in as_completed(future_to_region): + result = future.result() + instance_data[result['region']] = result['instances'] + + return instance_data + + def _fetch_spot_prices(self, region: str, instance_types: List[str]) -> Dict: + """Fetch spot price history for a region and instance types""" + try: + ec2_client = boto3.client('ec2', region_name=region) + prices = ec2_client.describe_spot_price_history( + InstanceTypes=instance_types, + StartTime=datetime.now() + ) + return {'region': region, 'prices': prices['SpotPriceHistory']} + except Exception as e: + print(f"Error fetching spot prices for region {region}: {e}") + return {'region': region, 'prices': []} + + def parallel_spot_price_fetch(self, regions: List[str], instance_types: List[str]) -> Dict[str, List]: + """Fetch spot prices for multiple regions in parallel""" + spot_prices = {} + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_region = { + executor.submit(self._fetch_spot_prices, region, instance_types): region + for region in regions + } + + for future in as_completed(future_to_region): + result = future.result() + spot_prices[result['region']] = result['prices'] + + return spot_prices + + def _fetch_availability_zones(self, region: str) -> Dict: + """Fetch availability zones for a region""" + try: + ec2_client = boto3.client('ec2', region_name=region) + response = ec2_client.describe_availability_zones() + return {'region': region, 'zones': response['AvailabilityZones']} + except Exception as e: + print(f"Error fetching AZs for region {region}: {e}") + return {'region': region, 'zones': []} + + def parallel_az_fetch(self, regions: List[str]) -> Dict[str, List]: + """Fetch availability zones for multiple regions in parallel""" + az_data = {} + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_region = { + executor.submit(self._fetch_availability_zones, region): region + for region in regions + } + + for future in as_completed(future_to_region): + result = future.result() + az_data[result['region']] = result['zones'] + + return az_data \ No newline at end of file diff --git a/scrape.py b/scrape.py index 7fd8bb8..9b84a67 100755 --- a/scrape.py +++ b/scrape.py @@ -9,8 +9,16 @@ import requests import pickle import boto3 +import logging from six.moves.urllib import request as urllib2 +from parallel_scraper import ParallelScraper +from parallel_pricing import ParallelPricingCollector +from dedicated_price_collector import DedicatedPriceCollector +from logging_setup import setup_logging, log_api_call, log_scraping_progress + +logger = setup_logging() + # Following advice from https://stackoverflow.com/a/1779324/216138 # The locale must be installed in the system, and it must be one where ',' is # the thousans separator and '.' is the decimal fraction separator. @@ -385,15 +393,15 @@ def add_instance_storage_details(instances): client = boto3.client('ec2', region_name='us-east-1') pager = client.get_paginator("describe_instance_types") responses = pager.paginate(Filters=[{'Name': 'instance-storage-supported', 'Values': ['true']},{'Name': 'instance-type', 'Values': ['*']}]) - + for response in responses: instance_types = response['InstanceTypes'] - + for i in instances: - for instance_type in instance_types: + for instance_type in instance_types: if i.instance_type == instance_type["InstanceType"]: storage_info = instance_type["InstanceStorageInfo"] - + if storage_info: nvme_support = storage_info["NvmeSupport"] disk = storage_info["Disks"][0] @@ -740,7 +748,7 @@ def add_gpu_info(instances): "cuda_cores": 76928, "gpu_memory": 192, }, - "g6.xlarge": { + "g6.xlarge": { # GPU core count found from the whitepaper # https://images.nvidia.com/aem-dam/Solutions/Data-Center/l4/nvidia-ada-gpu-architecture-whitepaper-v2.1.pdf "gpu_model": "NVIDIA L4", @@ -1062,121 +1070,44 @@ def add_placement_groups(instances): def add_dedicated_info(instances): - # Dedicated Host is a physical server with EC2 instance capacity fully dedicated to a single customer. - # We treat it as another type of OS, like RHEL or SUSE. - - region_map = {value: key for key, value in ec2.get_region_descriptions().items()} - # Note: AWS GovCloud (US) is us-gov-west-1. This seems to be an exception just for dedicated hosts. - region_map["us-gov-west-1"] = "AWS GovCloud (US)" - region_map["us-west-2-lax"] = "US West (Los Angeles)" - - # Normalize and translate term lengths and payment options to ec2instances.info terms - reserved_map = { - "1yrNoUpfront": "yrTerm1Standard.noUpfront", - "1yrPartialUpfront": "yrTerm1Standard.partialUpfront", - "1yrAllUpfront": "yrTerm1Standard.allUpfront", - "1 yrNoUpfront": "yrTerm1Standard.noUpfront", - "1 yrPartialUpfront": "yrTerm1Standard.partialUpfront", - "1 yrAllUpfront": "yrTerm1Standard.allUpfront", - "3yrNoUpfront": "yrTerm3Standard.noUpfront", - "3yrPartialUpfront": "yrTerm3Standard.partialUpfront", - "3yrAllUpfront": "yrTerm3Standard.allUpfront", - "3 yrNoUpfront": "yrTerm3Standard.noUpfront", - "3 yrPartialUpfront": "yrTerm3Standard.partialUpfront", - "3 yrAllUpfront": "yrTerm3Standard.allUpfront", - } - - def format_price(price): - return str(float("%f" % float(price))).rstrip("0").rstrip(".") - - def fetch_dedicated_prices(): - all_pricing = {} - - # On demand pricing, not all dedicated instances are available on demand - url = "https://b0.p.awsstatic.com/pricing/2.0/meteredUnitMaps/ec2/USD/current/dedicatedhost-ondemand.json" - od_pricing = fetch_data(url) - for region in od_pricing["regions"]: - all_pricing[region] = {} - for instance_description, dinst in od_pricing["regions"][region].items(): - _price = {"ondemand": format_price(dinst["price"]), "reserved": {}} - all_pricing[region][dinst["Instance Type"]] = _price - - # All of the reserved pricing is at different URLs - for region in od_pricing["regions"]: - for term in ["3 year", "1 year"]: - for payment in ["No Upfront", "Partial Upfront", "All Upfront"]: - base = f"https://b0.p.awsstatic.com/pricing/2.0/meteredUnitMaps/ec2/USD/current/dedicatedhost-reservedinstance-virtual/" - path = f"{region}/{term}/{payment}/index.json".replace(" ", "%20") - + """Add dedicated host pricing information to instances""" + logger = logging.getLogger('add_dedicated_info') + + try: + # Create price collector and fetch prices + collector = DedicatedPriceCollector(max_workers=10) + all_pricing = collector.fetch_dedicated_prices() + + region_map = {value: key for key, value in ec2.get_region_descriptions().items()} + region_map["us-gov-west-1"] = "AWS GovCloud (US)" + region_map["us-west-2-lax"] = "US West (Los Angeles)" + + # Update instance pricing information + for inst in instances: + if not inst.pricing: + # Handle instances that are ONLY available with dedicated host pricing + inst_type = inst.instance_type.split(".")[0] + for k, r in region_map.items(): + region = ec2.canonicalize_location(r, False) + if region in all_pricing and inst_type in all_pricing[region]: + _price = all_pricing[region][inst_type] + inst.regions[k] = region + inst.pricing[k] = {} + inst.pricing[k]["dedicated"] = _price + else: + # Add dedicated pricing to instances with existing pricing info + for region in inst.pricing: try: - pricing = fetch_data(base + path) - except: - print( - "WARNING: Ignoring pricing - dedicated host. region={}, term={}, payment={}".format( - region, term, payment - ) - ) - continue - - for instance_description, dinst in pricing["regions"][ - region - ].items(): - # Similar to get_reserved_pricing in ec2.py the goal is to get the effective hourly rate - # and then the frontend will deal with making it monthly, yearly etc - upfront = 0.0 - if "Partial" in payment or "All" in payment: - upfront = float(dinst["riupfront:PricePerUnit"]) - inst_type = dinst["Instance Type"] - ondemand = float(dinst["price"]) - lease_in_years = int(dinst["LeaseContractLength"][0]) - hours_in_term = lease_in_years * 365 * 24 - price = float(ondemand) + (float(upfront) / hours_in_term) - translate_ri = reserved_map[ - dinst["LeaseContractLength"] + dinst["PurchaseOption"] + _price = all_pricing[region_map[region]][ + inst.instance_type.split(".")[0] ] - - # Certain instances will not have been created above because they are not available on demand - if inst_type not in all_pricing[region]: - all_pricing[region][inst_type] = {"reserved": {}} - - all_pricing[region][inst_type]["reserved"][translate_ri] = ( - format_price(price) - ) - - return all_pricing - - all_pricing = fetch_dedicated_prices() - for inst in instances: - if not inst.pricing: - # Less than 10 instances are ONLY available with dedicated host pricing - # In this case there is no prior pricing dict to add the dedicated prices to so - # create a new one. Unfortunately we have to search by instance but the - # previous dedicated pricing dict we have built is by region. - inst_type = inst.instance_type.split(".")[0] - for k, r in region_map.items(): - region = ec2.canonicalize_location(r, False) - if inst_type in all_pricing[region]: - _price = all_pricing[region][inst_type] - inst.regions[k] = region - inst.pricing[k] = {} - inst.pricing[k]["dedicated"] = _price - else: - for region in inst.pricing: - # Add the 'dedicated' price to the price list as a top level key per region. - # Dedicated hosts are not associated with any type of software like rhel or mswin - # Not all instances are available as dedicated hosts - try: - _price = all_pricing[region_map[region]][ - inst.instance_type.split(".")[0] - ] - inst.pricing[region]["dedicated"] = _price - except KeyError: - pass - # print( - # "No dedicated host price for %s in %s" - # % (inst.instance_type, region) - # ) - + inst.pricing[region]["dedicated"] = _price + except KeyError: + pass + + except Exception as e: + logger.error(f"Error adding dedicated host information: {str(e)}") + raise def add_spot_interrupt_info(instances): """ @@ -1225,48 +1156,186 @@ def add_spot_interrupt_info(instances): ) instance.pricing[region][os_id]["spot_avg"] = f"{est_spot:.6f}" +class InstanceScraper: + def __init__(self, max_workers=10): + self.logger = logging.getLogger('scraper.InstanceScraper') + self.scraper = ParallelScraper(max_workers=max_workers) + self.regions = list(ec2.describe_regions()) + self.instance_map = {} + self.all_instances = [] + + def _process_instance_info(self, instance_info): + """Process raw instance information into an Instance object""" + try: + inst = Instance() + inst.instance_type = instance_info['InstanceType'] + inst.api_description = instance_info + + # Process CPU info + if 'ProcessorInfo' in instance_info: + inst.arch = instance_info['ProcessorInfo']['SupportedArchitectures'] + if 'SustainedClockSpeedInGhz' in instance_info['ProcessorInfo']: + inst.clock_speed_ghz = str(instance_info['ProcessorInfo']['SustainedClockSpeedInGhz']) + + # Process memory info + if 'MemoryInfo' in instance_info: + inst.memory = instance_info['MemoryInfo']['SizeInMiB'] / 1024.0 + + # Process vCPU info + if 'VCpuInfo' in instance_info: + inst.vCPU = instance_info['VCpuInfo']['DefaultVCpus'] + + # Process network info + if 'NetworkInfo' in instance_info: + net_info = instance_info['NetworkInfo'] + inst.network_performance = net_info['NetworkPerformance'] + if net_info.get('EnaSupport') == 'required': + inst.ebs_as_nvme = True + inst.vpc = { + 'max_enis': net_info['MaximumNetworkInterfaces'], + 'ips_per_eni': net_info['Ipv4AddressesPerInterface'] + } + + return inst + except Exception as e: + self.logger.error(f"Error processing instance info for {instance_info.get('InstanceType', 'unknown')}: {str(e)}") + raise + + def collect_instance_data(self): + """Collect basic instance type information""" + self.logger.info("Starting instance data collection...") + try: + instance_data = self.scraper.parallel_instance_fetch(self.regions) + processed = 0 + + for region, instances in instance_data.items(): + self.logger.debug(f"Processing {len(instances)} instances from {region}") + for instance_info in instances: + instance_type = instance_info['InstanceType'] + if instance_type not in self.instance_map: + inst = self._process_instance_info(instance_info) + self.instance_map[instance_type] = inst + self.all_instances.append(inst) + processed += 1 + + self.logger.info(f"Completed instance data collection. Found {len(self.instance_map)} unique instance types") + + except Exception as e: + self.logger.error(f"Failed to collect instance data: {str(e)}", exc_info=True) + raise + + def collect_pricing_data(self): + """Collect pricing information using parallel processing""" + self.logger.info("Starting parallel pricing data collection...") + try: + # Create pricing collector instance + collector = ParallelPricingCollector(max_workers=10) + + # Collect pricing data in parallel + collector.collect_pricing(self.all_instances, self.regions) + + self.logger.info("Completed parallel pricing data collection") + + except Exception as e: + self.logger.error(f"Failed to collect pricing data: {str(e)}") + raise + + def collect_additional_info(self): + """Collect all additional instance information""" + try: + self.logger.info("Collecting additional instance information...") + + # ENI info + self.logger.info("Adding ENI information...") + add_eni_info(self.all_instances) + + # Linux AMI info + self.logger.info("Adding Linux AMI information...") + add_linux_ami_info(self.all_instances) + + # VPC only info + self.logger.info("Adding VPC-only information...") + add_vpconly_detail(self.all_instances) + + # Instance storage + self.logger.info("Adding instance storage details...") + add_instance_storage_details(self.all_instances) + + # T2 credits + self.logger.info("Adding T2 credit information...") + add_t2_credits(self.all_instances) + + # Pretty names + self.logger.info("Adding pretty names...") + add_pretty_names(self.all_instances) + + # EMR info + self.logger.info("Adding EMR details...") + add_emr_info(self.all_instances) + + # GPU info + self.logger.info("Adding GPU details...") + add_gpu_info(self.all_instances) + + # Placement groups + self.logger.info("Adding placement group details...") + add_placement_groups(self.all_instances) + + # Dedicated host pricing + self.logger.info("Adding dedicated host pricing...") + add_dedicated_info(self.all_instances) + + # Spot interrupt info + self.logger.info("Adding spot interrupt details...") + add_spot_interrupt_info(self.all_instances) + + self.logger.info("Completed collecting additional information") + + except Exception as e: + self.logger.error(f"Failed to collect additional information: {str(e)}", exc_info=True) + raise + + def scrape_all(self): + """Execute complete scraping process""" + try: + self.logger.info("Starting complete scraping process") + + # Collect instance data + self.collect_instance_data() + + # Collect pricing data + self.collect_pricing_data() + + # Collect additional information + self.collect_additional_info() + + self.logger.info("Completed scraping process successfully") + + except Exception as e: + self.logger.error(f"Error during scraping process: {str(e)}") + raise def scrape(data_file): - """Scrape AWS to get instance data""" - print("Parsing instance types...") - all_instances = ec2.get_instances() - print("Parsing pricing info...") - add_pricing_info(all_instances) - print("Parsing ENI info...") - add_eni_info(all_instances) - print("Parsing Linux AMI info...") - add_linux_ami_info(all_instances) - print("Parsing VPC-only info...") - add_vpconly_detail(all_instances) - print("Parsing local instance storage...") - add_instance_storage_details(all_instances) - print("Parsing burstable instance credits...") - add_t2_credits(all_instances) - print("Parsing instance names...") - add_pretty_names(all_instances) - print("Parsing emr details...") - add_emr_info(all_instances) - print("Adding GPU details...") - add_gpu_info(all_instances) - print("Adding availability zone details...") - add_availability_zone_info(all_instances) - print("Adding placement group details...") - add_placement_groups(all_instances) - print("Adding dedicated host pricing...") - add_dedicated_info(all_instances) - print("Adding spot interrupt details...") - add_spot_interrupt_info(all_instances) - - os.makedirs(os.path.dirname(data_file), exist_ok=True) - with open(data_file, "w+") as f: - json.dump( - [i.to_dict() for i in all_instances], - f, - indent=1, - sort_keys=True, - separators=(",", ": "), - ) + """Main scraping function with improved error handling""" + logger = logging.getLogger(__name__) + try: + logger.info(f"Starting EC2 instance scraping process for {data_file}") + scraper = InstanceScraper(max_workers=10) + scraper.scrape_all() + + # Save results + logger.info(f"Saving results to {data_file}") + os.makedirs(os.path.dirname(data_file), exist_ok=True) + with open(data_file, "w") as f: + json.dump([i.to_dict() for i in scraper.all_instances], f, indent=1) + + logger.info("EC2 instance scraping process completed successfully") + + except Exception as e: + logger.error(f"Failed to complete EC2 instance scraping process: {str(e)}") + logger.debug(traceback.format_exc()) + raise if __name__ == "__main__": scrape("www/instances.json")