Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] DO NOT MERGE Initial code for parallel EC2 scraping #758

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions dedicated_price_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import requests
import json
import gzip
import re
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Any
import threading
from urllib.parse import quote

class DedicatedPriceCollector:
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.logger = logging.getLogger('DedicatedPriceCollector')
self.lock = threading.Lock()

def _fetch_data(self, url: str) -> Dict:
"""Helper method to fetch and parse JSON data from URLs"""
try:
response = requests.get(url)
response.raise_for_status() # Raises an HTTPError for bad responses
content = response.content

try:
content = content.decode()
except UnicodeDecodeError:
content = gzip.decompress(content).decode()

try:
data = json.loads(content)
except ValueError:
# If the data isn't compatible JSON, try to parse as jsonP
json_string = re.search(r"callback\((.*)\);", content).groups()[0]
json_string = re.sub(r"(\w+):", r'"\1":', json_string)
data = json.loads(json_string)

return data

except Exception as e:
self.logger.error(f"Error fetching data from {url}: {str(e)}")
raise

def _format_price(self, price: float) -> str:
"""Helper method to format prices consistently"""
return str(float("%f" % float(price))).rstrip('0').rstrip('.')

def _fetch_ondemand_prices(self) -> Dict[str, Any]:
"""Fetch on-demand pricing data for dedicated hosts"""
url = "https://b0.p.awsstatic.com/pricing/2.0/meteredUnitMaps/ec2/USD/current/dedicatedhost-ondemand.json"
try:
od_pricing = self._fetch_data(url)
all_pricing = {}

for region in od_pricing["regions"]:
all_pricing[region] = {}
for instance_description, dinst in od_pricing["regions"][region].items():
_price = {"ondemand": self._format_price(dinst["price"]), "reserved": {}}
all_pricing[region][dinst["Instance Type"]] = _price

return all_pricing
except Exception as e:
self.logger.error(f"Error fetching on-demand pricing: {str(e)}")
raise

def _fetch_reserved_price(self, params: Dict) -> Dict:
"""Fetch reserved pricing data for a specific region/term/payment combination"""
region = params['region']
term = params['term']
payment = params['payment']

try:
base = "https://b0.p.awsstatic.com/pricing/2.0/meteredUnitMaps/ec2/USD/current/dedicatedhost-reservedinstance-virtual/"
path = f"{quote(region)}/{quote(term)}/{quote(payment)}/index.json"

pricing = self._fetch_data(base + path)
results = []

reserved_map = {
"1yrNoUpfront": "yrTerm1Standard.noUpfront",
"1yrPartialUpfront": "yrTerm1Standard.partialUpfront",
"1yrAllUpfront": "yrTerm1Standard.allUpfront",
"1 yrNoUpfront": "yrTerm1Standard.noUpfront",
"1 yrPartialUpfront": "yrTerm1Standard.partialUpfront",
"1 yrAllUpfront": "yrTerm1Standard.allUpfront",
"3yrNoUpfront": "yrTerm3Standard.noUpfront",
"3yrPartialUpfront": "yrTerm3Standard.partialUpfront",
"3yrAllUpfront": "yrTerm3Standard.allUpfront",
"3 yrNoUpfront": "yrTerm3Standard.noUpfront",
"3 yrPartialUpfront": "yrTerm3Standard.partialUpfront",
"3 yrAllUpfront": "yrTerm3Standard.allUpfront",
}

for instance_description, dinst in pricing["regions"][region].items():
upfront = 0.0
if "Partial" in payment or "All" in payment:
upfront = float(dinst["riupfront:PricePerUnit"])

inst_type = dinst["Instance Type"]
ondemand = float(dinst["price"])
lease_in_years = int(dinst["LeaseContractLength"][0])
hours_in_term = lease_in_years * 365 * 24
price = float(ondemand) + (float(upfront) / hours_in_term)

translate_ri = reserved_map[
dinst["LeaseContractLength"] + dinst["PurchaseOption"]
]

results.append({
'region': region,
'instance_type': inst_type,
'pricing_term': translate_ri,
'price': self._format_price(price)
})

return results

except Exception as e:
self.logger.warning(
f"Failed to fetch reserved pricing for region={region}, term={term}, "
f"payment={payment}: {str(e)}"
)
return []

def _update_pricing_dict(self, all_pricing: Dict, result: Dict) -> None:
"""Thread-safe update of the pricing dictionary"""
with self.lock:
region = result['region']
inst_type = result['instance_type']
pricing_term = result['pricing_term']
price = result['price']

if inst_type not in all_pricing[region]:
all_pricing[region][inst_type] = {"reserved": {}}

all_pricing[region][inst_type]["reserved"][pricing_term] = price

def fetch_dedicated_prices(self) -> Dict:
"""Fetch all dedicated host pricing data using parallel processing"""
self.logger.info("Starting dedicated price collection")

# First fetch on-demand pricing
all_pricing = self._fetch_ondemand_prices()

# Prepare parameters for reserved price fetching
fetch_params = []
terms = ["3 year", "1 year"]
payments = ["No Upfront", "Partial Upfront", "All Upfront"]

for region in all_pricing.keys():
for term in terms:
for payment in payments:
fetch_params.append({
'region': region,
'term': term,
'payment': payment
})

# Fetch reserved pricing in parallel
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_params = {
executor.submit(self._fetch_reserved_price, params): params
for params in fetch_params
}

for future in as_completed(future_to_params):
params = future_to_params[future]
try:
results = future.result()
for result in results:
self._update_pricing_dict(all_pricing, result)
except Exception as e:
self.logger.error(
f"Error processing results for region={params['region']}, "
f"term={params['term']}, payment={params['payment']}: {str(e)}"
)

self.logger.info("Completed dedicated price collection")
return all_pricing
63 changes: 63 additions & 0 deletions logging_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
import os
import sys
from datetime import datetime

def setup_logging(log_level=logging.INFO):
"""Configure logging for the AWS instance scraper"""

# Create logs directory if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')

# Setup logging to both file and console
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_filename = f'logs/scraper_{timestamp}.log'

# Configure root logger
logger = logging.getLogger()
logger.setLevel(log_level)

# File handler with detailed formatting
file_handler = logging.FileHandler(log_filename)
file_formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)

# Console handler with simpler formatting
console_handler = logging.StreamHandler(sys.stdout)
console_formatter = logging.Formatter(
'%(levelname)-8s: %(message)s'
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

return logger

def log_api_call(service, operation, params=None, response=None, error=None):
"""Helper to log API calls consistently"""
logger = logging.getLogger(f'aws.{service}')

if params is None:
params = {}

msg = f"API Call: {operation}"
if error:
logger.error(f"{msg} failed: {error}")
logger.debug(f"Parameters: {params}")
else:
logger.debug(f"{msg} succeeded")
logger.debug(f"Parameters: {params}")
logger.debug(f"Response: {response}")

def log_scraping_progress(module, stage, items_processed=None, total_items=None):
"""Helper to log scraping progress consistently"""
logger = logging.getLogger(f'scraper.{module}')

if items_processed is not None and total_items is not None:
progress = (items_processed / total_items) * 100
logger.info(f"{stage}: Processed {items_processed}/{total_items} items ({progress:.1f}%)")
else:
logger.info(f"{stage}")
127 changes: 127 additions & 0 deletions parallel_pricing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import boto3
import concurrent.futures
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Any

class ParallelPricingCollector:
def __init__(self, max_workers=10):
self.max_workers = max_workers
self.logger = logging.getLogger('scraper.PricingCollector')

def _fetch_pricing_data_for_region(self, region: str, service_code: str, filters: List[Dict]) -> Dict:
"""Fetch pricing data for a specific region"""
logger = logging.getLogger(f'scraper.pricing.{region}')

try:
pricing_client = boto3.client('pricing', region_name='us-east-1') # Pricing API only available in us-east-1
paginator = pricing_client.get_paginator('get_products')

products = []
for page in paginator.paginate(
ServiceCode=service_code,
Filters=[
*filters,
{'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region}
]
):
products.extend(page['PriceList'])

return {'region': region, 'products': products}

except Exception as e:
logger.error(f"Error fetching pricing data for region {region}: {str(e)}")
return {'region': region, 'products': [], 'error': str(e)}

def _process_pricing_data(self, pricing_data: Dict, instance_map: Dict) -> None:
"""Process pricing data for a region and update instance pricing information"""
try:
region = pricing_data['region']
products = pricing_data['products']

for price_data in products:
# Extract relevant pricing info
product = price_data.get('product', {})
attributes = product.get('attributes', {})

instance_type = attributes.get('instanceType')
if not instance_type or instance_type not in instance_map:
continue

instance = instance_map[instance_type]

# Initialize pricing structure if needed
instance.pricing.setdefault(region, {})

# Process on-demand pricing
terms = price_data.get('terms', {})
ondemand_terms = terms.get('OnDemand', {})
for term_data in ondemand_terms.values():
price_dimensions = term_data.get('priceDimensions', {})
for dimension in price_dimensions.values():
if 'USD' in dimension.get('pricePerUnit', {}):
price = float(dimension['pricePerUnit']['USD'])
operating_system = attributes.get('operatingSystem', 'Linux')
preinstalled_software = attributes.get('preInstalledSw', 'NA')

platform = self._translate_platform(operating_system, preinstalled_software)
instance.pricing[region].setdefault(platform, {})
instance.pricing[region][platform]['ondemand'] = price

except Exception as e:
self.logger.error(f"Error processing pricing data for region {region}: {str(e)}")

def _translate_platform(self, operating_system: str, preinstalled_software: str) -> str:
"""Translate AWS platform names to internal representation"""
os_map = {
'Linux': 'linux',
'RHEL': 'rhel',
'SUSE': 'sles',
'Windows': 'mswin',
'Linux/UNIX': 'linux'
}

software_map = {
'NA': '',
'SQL Std': 'SQL',
'SQL Web': 'SQLWeb',
'SQL Ent': 'SQLEnterprise'
}

os_key = os_map.get(operating_system, 'linux')
software_key = software_map.get(preinstalled_software, '')
return os_key + software_key

def collect_pricing(self, instances: List[Any], regions: List[str]) -> None:
"""Collect pricing data in parallel for all instances"""
instance_map = {i.instance_type: i for i in instances}

filters = [
{'Type': 'TERM_MATCH', 'Field': 'capacityStatus', 'Value': 'Used'},
{'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'Shared'},
{'Type': 'TERM_MATCH', 'Field': 'licenseModel', 'Value': 'No License required'}
]

# Initialize pricing dictionaries
for instance in instances:
instance.pricing = {}

# Fetch pricing data in parallel for all regions
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_region = {
executor.submit(
self._fetch_pricing_data_for_region,
region,
'AmazonEC2',
filters
): region for region in regions
}

for future in as_completed(future_to_region):
region = future_to_region[future]
try:
pricing_data = future.result()
self._process_pricing_data(pricing_data, instance_map)
self.logger.debug(f"Completed pricing collection for region {region}")
except Exception as e:
self.logger.error(f"Failed to collect pricing for region {region}: {str(e)}")
Loading
Loading