Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
alessiovierti committed Jan 31, 2021
0 parents commit b8dd8a5
Show file tree
Hide file tree
Showing 9 changed files with 976 additions and 0 deletions.
64 changes: 64 additions & 0 deletions .github/workflows/create-release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Update version and create release

on:
pull_request:
types: [closed]
branches:
- master

jobs:

fetch-version:
if: github.event.pull_request.merged
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Fetch latest release version
id: fetch-latest-release
uses: reloc8/[email protected]
- uses: actions/setup-python@v2
with:
python-version: 3.7
- name: Choose new release version
id: choose-release-version
uses: reloc8/[email protected]
with:
source-branch: ${{ github.event.pull_request.head.ref }}
latest-version: ${{ steps.fetch-latest-release.outputs.latest-release }}
outputs:
new-version: ${{ steps.choose-release-version.outputs.new-version }}

update-version:
needs: fetch-version
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: git pull --ff-only
- name: Update version file
run: echo ${{ needs.fetch-version.outputs.new-version }} > version
- name: Push local repository changes
id: push-local-repository-changes
uses: reloc8/[email protected]
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
commit-message: "Version ${{ needs.fetch-version.outputs.new-version }}"
outputs:
commit-hash: ${{ steps.push-local-repository-changes.outputs.commit-hash }}

create-release:
needs: [fetch-version, update-version]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: git pull --ff-only
- name: Create new release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ needs.fetch-version.outputs.new-version }}
release_name: ${{ needs.fetch-version.outputs.new-version }}
draft: false
prerelease: false
commitish: ${{ needs.update-version.outputs.commit-hash }}
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
.idea
.venv
*.egg-info
661 changes: 661 additions & 0 deletions LICENSE.txt

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## Development

1. Create a virtual environment:

`$ python3 -m venv .venv`

2. Activate the created environment:

`$ source .venv/bin/activate`

3. Upgrade `pip`:

`$ python3 -m pip install --upgrade pip`

4. Install the requirements:

`$ pip install --upgrade -r requirements.txt`

5. Mark the main package as Sources Root.

## Test

1. Install the testing requirements:

`$ pip install --upgrade -r requirements-test.txt`

2. Run all tests in package `tests`
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-e .
40 changes: 40 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import setuptools

from typing import AnyStr


GITHUB_PERSONAL_ACCESS_TOKEN = os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')


def private_dependency(personal_access_token: AnyStr,
repo_user: AnyStr, repo_name: AnyStr,
package_name: AnyStr, package_version: AnyStr):
"""Defines a dependency from a private Github repository
:param personal_access_token: Github Personal Access Token
:param repo_user: Dependency repository user
:param repo_name: Dependency repository name
:param package_name: Dependency package name
:param package_version: Dependency repository release (tag)
:return: The dependency specification for the install_requires field
"""

return f'{package_name} @ ' \
f'git+https://{personal_access_token}@github.com/' \
f'{repo_user}/{repo_name}.git/@{package_version}#egg={package_name}-0'


with open('version', 'r') as version:

setuptools.setup(
name='sqs_utils',
version=version.readline(),
author='Alessio Vierti',
packages=setuptools.find_packages(exclude=['tests']),
install_requires=[
'boto3==1.16.53',
'botocore==1.19.54'
],
python_requires='>=3.6'
)
178 changes: 178 additions & 0 deletions sqs_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import boto3
import hashlib
import logging

from dataclasses import dataclass
from typing import Any, AnyStr, List, Callable, Union, Tuple
from botocore.exceptions import ClientError


@dataclass
class SqsClient:

boto_client: Any
boto_resource: Any


@dataclass(init=False)
class SqsUtils:

logger: logging.Logger

def __init__(self, logger: logging.Logger = logging.getLogger()):

self.logger = logger
self.client = SqsClient(boto_client=boto3.client('sqs'), boto_resource=boto3.resource('sqs'))

def send_batch(self,
batch: List[AnyStr], queue_name: AnyStr,
group_id: AnyStr = 'default',
fun_identify: Callable[[AnyStr], AnyStr] = lambda s: hashlib.sha1(s).hexdigest(),
fun_deduplicate: Callable[[AnyStr], AnyStr] = lambda s: hashlib.sha1(s).hexdigest()) -> bool:
"""Sends a batch of messages to a queue
:param batch: Batch of messages to send
:param queue_name: Name of the queue
:param group_id: SQS group-id - messages within the same group are strictly ordered
:param fun_identify: Callable that returns a message id within this batch
:param fun_deduplicate: Callable that returns a message id within a deduplication interval
:return: True if entire batch was sent
"""

successful = True

if len(batch) > 0:

fifo = queue_name.lower().endswith('.fifo')

queue = self.client.boto_resource.get_queue_by_name(QueueName=queue_name)
entries = [
dict(
MessageBody=message,
MessageGroupId=group_id,
Id=fun_identify(message.encode('utf-8')),
MessageDeduplicationId=fun_deduplicate(message.encode('utf-8'))
) for message in batch
]
if not fifo:
for entry in entries:
entry.pop('MessageDeduplicationId')
entry.pop('MessageGroupId')

successful_writes = queue.send_messages(Entries=entries).get('Successful')
successful = successful_writes is not None and len(successful_writes) == len(batch)

return successful

def receive_one(self,
queue_name: AnyStr,
hide_for_seconds: int = 60 * 60,
poll_for_seconds: int = 20) -> Union[AnyStr, None]:
"""Receives a single message from a queue
:param queue_name: Name of the queue
:param hide_for_seconds: SQS visibility-timeout - how long the message will be hidden from subsequent request
:param poll_for_seconds: SQS wait-time-seconds - maximum time to wait for new messages
:return: The body of the message if present, None otherwise
"""

batch = self.receive_many(
queue_name=queue_name,
hide_for_seconds=hide_for_seconds,
poll_for_seconds=poll_for_seconds,
max_batch_size=1
)

if len(batch) > 0:
return batch[0]
else:
return None

def receive_many(self,
queue_name: AnyStr,
hide_for_seconds: int = 60 * 60,
poll_for_seconds: int = 20,
max_batch_size: int = 10,
with_receipt: bool = False) -> List[AnyStr]:
"""Receives a batch of messages from a queue
:param queue_name: Name of the queue
:param hide_for_seconds: SQS visibility-timeout - how long the message will be hidden from subsequent request
:param poll_for_seconds: SQS wait-time-seconds - maximum time to wait for new messages
:param max_batch_size: Max number of messages to receive
:param with_receipt: If True each element is a couple (body, receipt)
:return: The list of message bodies if any message is present, an empty list otherwise
"""

messages = self.__receive_many(
queue_name=queue_name, hide_for_seconds=hide_for_seconds, poll_for_seconds=poll_for_seconds,
max_batch_size=max_batch_size
)

if with_receipt:
return messages
else:
return [message[0] for message in messages]

def remove_batch(self,
queue_name: AnyStr,
receipts: List[AnyStr]) -> bool:
"""Removes a batch of messages from a queue given their receipts
:param queue_name: Name of the queue
:param receipts: List of message receipts
:return: True if all messages have been removed
"""

if len(receipts) == 0:
return True

try:
response = len(
self.client.boto_client.delete_message_batch(
QueueUrl=self.client.boto_client.get_queue_url(QueueName=queue_name)['QueueUrl'],
Entries=[
dict(
Id=hashlib.sha1(receipt.encode('utf-8')).hexdigest(),
ReceiptHandle=receipt
) for receipt in receipts
]
)
) == len(receipts)
except ClientError as error:
if error.response['Error']['Code'] == 'ReceiptHandleIsInvalid':
response = False
else:
raise

return response

def __receive_many(self,
queue_name: AnyStr,
hide_for_seconds: int = 60 * 60,
poll_for_seconds: int = 20,
max_batch_size: int = 10) -> List[Tuple[AnyStr, AnyStr]]:
"""Receives a batch of messages and their corresponding receipts from a queue
:param queue_name: Name of the queue
:param hide_for_seconds: SQS visibility-timeout - how long the message will be hidden from subsequent request
:param poll_for_seconds: SQS wait-time-seconds - maximum time to wait for new messages
:param max_batch_size: Max number of messages to receive
:return: List of couples (body, receipt) if any message is present, an empty list otherwise
"""

response = self.client.boto_client.receive_message(
QueueUrl=self.client.boto_client.get_queue_url(QueueName=queue_name)['QueueUrl'],
MaxNumberOfMessages=max_batch_size,
VisibilityTimeout=hide_for_seconds,
WaitTimeSeconds=poll_for_seconds
)

messages = response.get('Messages')

if messages is not None and len(messages) > 0:
messages = [(message['Body'], message['ReceiptHandle']) for message in messages]
else:
messages = []

return messages
Empty file added tests/__init__.py
Empty file.
1 change: 1 addition & 0 deletions version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.0.0

0 comments on commit b8dd8a5

Please sign in to comment.