diff --git a/logbeam/__init__.py b/logbeam/__init__.py index 6a2f874..d4733f3 100644 --- a/logbeam/__init__.py +++ b/logbeam/__init__.py @@ -6,7 +6,7 @@ from cwlogs.push import EventBatchPublisher, EventBatch, LogEvent from cwlogs.threads import BaseThread -from six.moves import queue as Queue +from six.moves import queue logger = logging.getLogger(__name__) @@ -28,17 +28,18 @@ def __init__( log_stream_name, buffer_duration, batch_count, - batch_size): + batch_size, + queue_cls=queue.Queue): super(BatchedCloudWatchSink, self).__init__(Event()) self.logs_service = logs_service self.publisher_stop_flag = Event() self.group_stop_flag = Event() # Incoming LogEvents enter this queue via self.add_event() - self.event_queue = Queue.Queue() + self.event_queue = queue_cls() # Completed EventBatches get put onto this queue, for the # EventBatchPublisher to upload - self.publisher_queue = Queue.Queue() + self.publisher_queue = queue_cls() # The publisher thread, will be started and stopped with this thread self.publisher = EventBatchPublisher( @@ -101,7 +102,7 @@ def _run(self): if add_status == 0: self._send_batch_to_publisher(force=True) self._add_event_to_batch(event) - except Queue.Empty: + except queue.Empty: if self._exit_needed(): self._send_batch_to_publisher(force=True) break @@ -125,7 +126,7 @@ def __init__( log_group_name, log_stream_name, buffer_duration=10000, - batch_count=10, + batch_count=1000, batch_size=1024 * 1024, logs_client=None, *args, **kwargs): diff --git a/logbeam/version.py b/logbeam/version.py index cd7ca49..1a72d32 100644 --- a/logbeam/version.py +++ b/logbeam/version.py @@ -1 +1 @@ -__version__ = '1.0.1' +__version__ = '1.1.0'