Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain the pool of workers in its own thread for more reliability and flexibility #2191

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 60 additions & 9 deletions changedetectionio/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,7 @@ def highlight_submit_ignore_url():
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()
threading.Thread(target=thread_maintain_worker_thread_pool).start()

# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
Expand Down Expand Up @@ -1629,23 +1630,73 @@ def notification_runner():
# Trim the log length
notification_debug_log = notification_debug_log[-100:]


def thread_maintain_worker_thread_pool():
from changedetectionio import update_worker

n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

problem here is that it will prefer this var if it is set


while not app.config.exit.is_set():
needed_threads = n_workers if not running_update_threads else 0
how_many_running_now = 0
dead_threads = []

for i, t in enumerate(running_update_threads):
if t.is_alive():
how_many_running_now += 1
else:
dead_threads.append(i)

for i in dead_threads:
del running_update_threads[i]

for _ in range(needed_threads - how_many_running_now):
logger.info("Adding new worker thread")
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()

app.config.exit.wait(2)



def thread_maintain_worker_thread_pool():
from changedetectionio import update_worker

logger.info("Starting thread pool worker maintainer thread")
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))

while not app.config.exit.is_set():
needed_threads = n_workers if not running_update_threads else 0
how_many_running_now = 0
dead_threads = []

for i, t in enumerate(running_update_threads):
if t.is_alive():
how_many_running_now += 1
else:
dead_threads.append(i)

for i in dead_threads:
del running_update_threads[i]

for _ in range(needed_threads - how_many_running_now):
logger.info("Adding new worker thread")
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()

app.config.exit.wait(2)

# Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks():
import random
from changedetectionio import update_worker

proxy_last_called_time = {}

recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")

# Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()

while not app.config.exit.is_set():

Expand Down Expand Up @@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks():
priority = int(time.time())
logger.debug(
f"> Queued watch UUID {uuid} "
f"last checked at {watch['last_checked']} "
f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} "
f"queued at {now:0.2f} priority {priority} "
f"jitter {watch.jitter_seconds:0.2f}s, "
f"{now - watch['last_checked']:0.2f}s since last checked")
Expand Down
Loading