-
Notifications
You must be signed in to change notification settings - Fork 5
/
locust-load-test.py
91 lines (71 loc) · 2.76 KB
/
locust-load-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import boto3
import json
import random
import time
import uuid
import logging
from locust import User, task, constant, events
from faker import Faker
REGION = os.environ.get("REGION") if os.environ.get("REGION") else "eu-central-1"
BATCH_SIZE = int(os.environ.get("LOCUST_BATCH_SIZE")) if os.environ.get("LOCUST_BATCH_SIZE") else 1
faker = Faker()
class KinesisBotoClient:
def __init__(self, region_name, stream_name, batch_size):
self.kinesis_client = boto3.client('kinesis', region_name=region_name)
self.stream_name = stream_name
self.batch_size = batch_size
logging.info("Created KinesisBotoClient in '%s' for Kinesis stream '%s'", region_name, stream_name)
def send(self, records):
request_meta = {
"request_type": "Send data",
"name": "Kinesis",
"start_time": time.time(),
"response_length": 0,
"response": None,
"context": {},
"exception": None,
}
start_perf_counter = time.perf_counter()
try:
self.kinesis_client.put_records(
StreamName=self.stream_name, Records=records)
except Exception as e:
request_meta['exception'] = e
request_meta["response_time"] = (
time.perf_counter() - start_perf_counter) * 1000
for _ in range(self.batch_size):
events.request.fire(**request_meta)
class KinesisBotoUser(User):
abstract = True
def __init__(self, env):
super().__init__(env)
self.client = KinesisBotoClient(region_name=REGION, stream_name=self.host, batch_size=BATCH_SIZE)
class SensorAPIUser(KinesisBotoUser):
wait_time = constant(1)
def generate_sensor_reading(self, sensor_id, sensor_reading):
current_temperature = round(10 + random.random() * 170, 2)
if current_temperature > 160:
status = "ERROR"
elif current_temperature > 140 or random.randrange(1, 100) > 80:
status = random.choice(["WARNING", "ERROR"])
else:
status = "OK"
return {
'sensorId': f"{sensor_id}_{sensor_reading}",
'temperature': current_temperature,
'status': status,
'timestamp': round(time.time()*1000)
}
def on_start(self):
self.user_id = str(uuid.uuid4())
return super().on_start()
@task
def send_sensor_value(self):
events = []
for i in range(BATCH_SIZE):
sensor_reading = self.generate_sensor_reading(self.user_id, i)
event = {'Data': json.dumps(sensor_reading), 'PartitionKey': str(sensor_reading['sensorId'])}
events.append(event)
logging.debug("Generated events for Kinesis: %s", events)
self.client.send(events)