Skip to content

Commit

Permalink
[4.3] HELP-43867: rework ledger/modb/services at month's end (#5829)
Browse files Browse the repository at this point in the history
* [4.3] HELP-43867: rework ledger/modb/services at month's end
  • Loading branch information
jamesaimonetti authored and lazedo committed May 31, 2019
1 parent da82c40 commit ccf681e
Show file tree
Hide file tree
Showing 54 changed files with 799 additions and 224 deletions.
4 changes: 2 additions & 2 deletions applications/braintree/src/braintree_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sync_account_services_payments_info(AccountId, Services) ->
{'error', _} -> io:format("failed to update service doc~n", [])
end;
#bt_api_error{errors = Errors} ->
io:format("braintree failed with ~p~n", format_errors(Errors))
io:format("braintree failed with ~p~n", [format_errors(Errors)])
catch
'throw':{_, ErrJObj} -> io:format("braintree failed with ~s ~n", [kz_json:encode(ErrJObj)])
end.
Expand All @@ -66,7 +66,7 @@ sync_payment_info(AccountId, CardId) ->
#bt_card{} ->
io:format("card not found~n");
#bt_api_error{errors = Errors} ->
io:format("braintree failed with ~p~n", format_errors(Errors))
io:format("braintree failed with ~p~n", [format_errors(Errors)])
catch
'throw':{_, ErrJObj} -> io:format("braintree failed with ~s ~n", [kz_json:encode(ErrJObj)])
end.
48 changes: 48 additions & 0 deletions applications/crossbar/priv/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -34505,6 +34505,43 @@
},
"type": "object"
},
"system_config.tasks.ledger_rollover": {
"description": "Schema for tasks.ledger_rollover system_config",
"properties": {
"refresh_in_parallel": {
"default": 50,
"description": "tasks ledger_rollover refresh_in_parallel",
"type": "integer"
},
"refresh_view_enabled": {
"default": false,
"description": "tasks ledger_rollover refresh_view_enabled",
"type": "boolean"
},
"rollover_in_parallel": {
"default": 10,
"description": "How many accounts to rollover per-pass (in parallel)",
"type": "integer"
}
},
"type": "object"
},
"system_config.tasks.modb_creation": {
"description": "Schema for tasks.modb_creation system_config",
"properties": {
"create_in_parallel": {
"default": 1,
"description": "How many accounts to process per pass (in parallel).",
"type": "integer"
},
"creation_day": {
"default": 28,
"description": "Which day of the month (of current month) to create next month's MODBs on",
"type": "integer"
}
},
"type": "object"
},
"system_config.tasks.notify_resend": {
"description": "Schema for tasks.notify_resend system_config",
"properties": {
Expand Down Expand Up @@ -34583,6 +34620,17 @@
},
"type": "object"
},
"system_config.tasks.services_rollover": {
"description": "Schema for tasks.services_rollover system_config",
"properties": {
"rollover_in_parallel": {
"default": 10,
"description": "How many accounts to rollover services for per pass (in parallel)",
"type": "integer"
}
},
"type": "object"
},
"system_config.teletype": {
"description": "Schema for teletype system_config",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"_id": "system_config.tasks.ledger_rollover",
"description": "Schema for tasks.ledger_rollover system_config",
"properties": {
"refresh_in_parallel": {
"default": 50,
"description": "tasks ledger_rollover refresh_in_parallel",
"type": "integer"
},
"refresh_view_enabled": {
"default": false,
"description": "tasks ledger_rollover refresh_view_enabled",
"type": "boolean"
},
"rollover_in_parallel": {
"default": 10,
"description": "How many accounts to rollover per-pass (in parallel)",
"type": "integer"
}
},
"type": "object"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"_id": "system_config.tasks.modb_creation",
"description": "Schema for tasks.modb_creation system_config",
"properties": {
"create_in_parallel": {
"default": 1,
"description": "How many accounts to process per pass (in parallel).",
"type": "integer"
},
"creation_day": {
"default": 28,
"description": "Which day of the month (of current month) to create next month's MODBs on",
"type": "integer"
}
},
"type": "object"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"_id": "system_config.tasks.services_rollover",
"description": "Schema for tasks.services_rollover system_config",
"properties": {
"rollover_in_parallel": {
"default": 10,
"description": "How many accounts to rollover services for per pass (in parallel)",
"type": "integer"
}
},
"type": "object"
}
10 changes: 7 additions & 3 deletions applications/crossbar/src/crossbar_services.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ maybe_dry_run(Context, CurrentJObj, ProposedJObj) ->
Services = kz_services:fetch(AuthAccountId),
Updated = kz_services:set_updates(Services
,AccountId
,CurrentJObj
,ProposedJObj
,kz_services:to_billables(CurrentJObj)
,kz_services:to_billables(ProposedJObj)
),
Quotes = kz_services_invoices:create(Updated),
HasAdditions = kz_services_invoices:has_billable_additions(Quotes),
Expand Down Expand Up @@ -125,7 +125,11 @@ update_subscriptions(_Context, _CurrentJObj, _ProposedJObj, 'undefined') ->
update_subscriptions(Context, CurrentJObj, ProposedJObj, AccountId) ->
AuditLog = audit_log(Context),
lager:info("committing updates to ~s", [AccountId]),
_ = kz_services:commit_updates(AccountId, CurrentJObj, ProposedJObj, AuditLog),
_ = kz_services:commit_updates(AccountId
,CurrentJObj
,ProposedJObj
,AuditLog
),
'ok'.

%%------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion applications/crossbar/src/modules/cb_services.erl
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ load_audit_logs(Context) ->
%% @doc
%% @end
%%------------------------------------------------------------------------------
-spec override_plans(cb_context:context(), kz_services:serivces()) -> kz_services:services().
-spec override_plans(cb_context:context(), kz_services:services()) -> kz_services:services().
override_plans(Context, Services) ->
case kz_json:get_ne_json_value(<<"overrides">>, cb_context:req_data(Context)) of
'undefined' -> Services;
Expand Down
4 changes: 2 additions & 2 deletions applications/ecallmgr/src/ecallmgr_call_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
,fetch_id :: kz_term:api_ne_binary()
,controller_q :: kz_term:api_ne_binary()
,control_q :: kz_term:api_ne_binary()
,initial_ccvs :: kz_json:object()
,initial_ccvs :: kz_term:api_object()
,node_down_tref :: kz_term:api_reference()
}).
-type state() :: #state{}.
Expand Down Expand Up @@ -972,7 +972,7 @@ queue_insert_fun('head') ->
%% @end
%%------------------------------------------------------------------------------
%% See Noop documentation for Filter-Applications to get an idea of this function's purpose
-spec maybe_filter_queue(kz_json:api_objects(), queue:queue()) -> queue:queue().
-spec maybe_filter_queue(kz_term:api_objects(), queue:queue()) -> queue:queue().
maybe_filter_queue('undefined', CommandQ) -> CommandQ;
maybe_filter_queue([], CommandQ) -> CommandQ;
maybe_filter_queue([AppName|T]=Apps, CommandQ) when is_binary(AppName) ->
Expand Down
21 changes: 21 additions & 0 deletions applications/tasks/doc/ledger_rollover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Ledger Rollover Task

The ledger rollover task, at the start of the month, walks the accounts and rolls over their ledger amount from the previous month to the new MODB.

There is also a task to refresh the primary ledgers view, `totals_by_source`, to make sure the index isn't too far behind when the monthly rollover task occurs.

## Rollover

Rollover sums up the ledgers of the previous month's MODB and creates a new ledger in the now-current MODB for that amount.

Set `tasks.ledger_rollover`'s `rollover_in_parallel` to control how many accounts to roll over at a time.

## Refresh

Refresh the view index for the `ledgers/totals_by_source` view to process any docs.

For reference, running the rollover on an unindexed MODB with 12,000 docs (318 of which are ledger docs) took ~6 seconds. Running the same view on an up-to-date index took 70 milliseconds. Similar to CDRs, this is a good one to enable.

Set `tasks.ledger_rollover`'s `refresh_view_enabled` to `true` to enable the job (performed daily).

Set `tasks.ledger_rollover`'s `refresh_in_parallel` to control how many MODBs to refresh at a time.
42 changes: 42 additions & 0 deletions applications/tasks/doc/modb_creation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# MODB Creation Task

Create account MODBs ahead of time.

## Metered creation

On the configured day of the month (the 28th by default), the task will calculate the seconds left until the start of the next month. Based on that time range and the number of accounts, MODB creation will be spaced across that time period.

For instance:

```
% May 28 2019 = 63726220800
% June 1 2019 = 63726566400
DiffS = (63726566400 - 63726220800) = 345600s
Accounts = 1000
DelayPerAccount = (DiffS / Accounts) = 345600 / 1000 = 345s
```

An MODB will be created every 345s until the end of the month.

## Manually run

If you need to manually run the task, you can use SUP to accomplish it:

```
sup kt_modb_creation create_modbs
```

This will spawn a worker and return immediately with the PID of that worker.

## System Config

### Date of starting creation

System admins can select which day of the month to start MODB creation for the next month. Set `tasks.modb_creation` doc's `creation_day` to a number in `1..28` to ensure each month will trigger creation.

### Parallel Creation

System admins can speed this up by increasing the `tasks.modb_creation` doc's `create_in_parallel` (default is 1). If set to `10` for instance, the delay becomes `(DiffS / InParallel) / Accounts` or `34s` - 10 MODBs created every 34 seconds.
5 changes: 5 additions & 0 deletions applications/tasks/doc/services_rollover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Services Rollover Task

Handles rolling over service totals to the new MODB.

Set `tasks.services_rollover`'s `rollover_in_parallel` to control how many accounts to process at the same time.
73 changes: 52 additions & 21 deletions applications/tasks/src/kz_tasks_trigger.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
,terminate/2
]).

-ifdef(TEST).
-export([seconds_until_next_day/1
,seconds_until_next_hour/1
,seconds_until_next_minute/1
]).
-endif.

-include("tasks.hrl").

-define(SERVER, {'via', 'kz_globals', ?MODULE}).
Expand All @@ -33,7 +40,8 @@
-type state() :: #state{}.

-define(CLEANUP_TIMER
,kapps_config:get_pos_integer(?CONFIG_CAT, <<"browse_dbs_interval_s">>, ?SECONDS_IN_DAY)).
,kapps_config:get_pos_integer(?CONFIG_CAT, <<"browse_dbs_interval_s">>, ?SECONDS_IN_DAY)
).

%%%=============================================================================
%%% API
Expand Down Expand Up @@ -115,26 +123,26 @@ handle_cast(_Msg, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info({'EXIT', _Pid, normal}, State) ->
handle_info({'EXIT', _Pid, 'normal'}, State) ->
lager:debug("job ~p terminated normally", [_Pid]),
{noreply, State};
{'noreply', State};
handle_info({'EXIT', _Pid, _Reason}, State) ->
lager:error("job ~p crashed: ~p", [_Pid, _Reason]),
{noreply, State};
{'noreply', State};

handle_info({timeout, Ref, _Msg}, #state{minute_ref = Ref}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{minute_ref = Ref}=State) ->
spawn_jobs(Ref, ?TRIGGER_MINUTELY),
{'noreply', State#state{minute_ref = minute_timer()}};

handle_info({timeout, Ref, _Msg}, #state{hour_ref = Ref}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{hour_ref = Ref}=State) ->
spawn_jobs(Ref, ?TRIGGER_HOURLY),
{'noreply', State#state{hour_ref = hour_timer()}};

handle_info({timeout, Ref, _Msg}, #state{day_ref = Ref}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{day_ref = Ref}=State) ->
spawn_jobs(Ref, ?TRIGGER_DAILY),
{'noreply', State#state{day_ref = day_timer()}};

handle_info({timeout, Ref, _Msg}, #state{browse_dbs_ref = Ref}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{browse_dbs_ref = Ref}=State) ->
_Pid = kz_util:spawn(fun browse_dbs_for_triggers/1, [Ref]),
lager:debug("cleaning up in ~p(~p)", [_Pid, Ref]),
{'noreply', State};
Expand Down Expand Up @@ -172,38 +180,60 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
-spec minute_timer() -> reference().
minute_timer() ->
erlang:start_timer(?MILLISECONDS_IN_MINUTE, self(), ok).
erlang:start_timer(seconds_until_next_minute() * ?MILLISECONDS_IN_SECOND, self(), 'ok').

-spec seconds_until_next_minute() -> 0..?SECONDS_IN_MINUTE.
seconds_until_next_minute() ->
seconds_until_next_minute(calendar:universal_time()).

-spec seconds_until_next_minute(kz_time:datetime()) -> 0..?SECONDS_IN_MINUTE.
seconds_until_next_minute({_, {_H, _M, S}}) ->
?SECONDS_IN_MINUTE - S.

-spec hour_timer() -> reference().
hour_timer() ->
erlang:start_timer(?MILLISECONDS_IN_HOUR, self(), ok).
erlang:start_timer(seconds_until_next_hour() * ?MILLISECONDS_IN_SECOND, self(), 'ok').

-spec seconds_until_next_hour() -> 0..?SECONDS_IN_HOUR.
seconds_until_next_hour() ->
seconds_until_next_hour(calendar:universal_time()).

-spec seconds_until_next_hour(kz_time:datetime()) -> 0..?SECONDS_IN_HOUR.
seconds_until_next_hour({_, {_H, M, S}}) ->
((?MINUTES_IN_HOUR - M) * ?SECONDS_IN_MINUTE) - S.

-spec day_timer() -> reference().
day_timer() ->
erlang:start_timer(?MILLISECONDS_IN_DAY, self(), ok).
erlang:start_timer(seconds_until_next_day() * ?MILLISECONDS_IN_SECOND, self(), 'ok').

-spec seconds_until_next_day() -> 0..?SECONDS_IN_DAY.
seconds_until_next_day() ->
seconds_until_next_day(calendar:universal_time()).

-spec seconds_until_next_day(kz_time:datetime()) -> 0..?SECONDS_IN_DAY.
seconds_until_next_day({_, {H, M, S}}) ->
((?HOURS_IN_DAY - H) * ?SECONDS_IN_HOUR) - (M * ?SECONDS_IN_MINUTE) - S.

-spec browse_dbs_timer() -> reference().
browse_dbs_timer() ->
Expiry = ?CLEANUP_TIMER,
lager:debug("starting cleanup timer for ~b s", [Expiry]),
erlang:start_timer(Expiry * ?MILLISECONDS_IN_SECOND, self(), ok).

erlang:start_timer(Expiry * ?MILLISECONDS_IN_SECOND, self(), 'ok').

-spec spawn_jobs(reference(), kz_term:ne_binary()) -> ok.
-spec spawn_jobs(reference(), kz_term:ne_binary()) -> 'ok'.
spawn_jobs(Ref, Binding) ->
CallId = make_callid(Ref, Binding),
_Pid = erlang:spawn_link(fun () ->
_ = kz_util:put_callid(CallId),
tasks_bindings:map(Binding, [])
end),
kz_util:put_callid(make_callid(Ref, Binding)),
_Pid = kz_util:spawn_link(fun tasks_bindings:map/2, [Binding, []]),
kz_util:put_callid(?MODULE),
lager:debug("binding ~s triggered ~p via ~p", [Binding, _Pid, Ref]).

-spec make_callid(reference(), kz_term:ne_binary()) -> kz_term:ne_binary().
make_callid(Ref, Binding) ->
Key = lists:last(binary:split(Binding, <<$.>>, [global])),
Key = lists:last(binary:split(Binding, <<$.>>, ['global'])),
Id = ref_to_id(Ref),
<<"task_", Key/binary, "_", Id/binary>>.

-spec ref_to_id(reference()) -> kz_term:ne_binary().
ref_to_id(Ref) ->
Bin = list_to_binary(io_lib:format("~p", [Ref])),
Start = <<"#Ref<">>,
Expand Down Expand Up @@ -233,7 +263,8 @@ browse_dbs_for_triggers(Ref) ->
'ok' = kt_compaction_reporter:start_tracking_job(self(), node(), CallId, Sorted),
F = fun({Db, _Sizes}, Ctr) ->
lager:debug("compacting ~p out of ~p dbs (~p remaining)",
[Ctr, TotalSorted, (TotalSorted - Ctr)]),
[Ctr, TotalSorted, (TotalSorted - Ctr)]
),
cleanup_pass(Db),
Ctr + 1
end,
Expand Down
Loading

0 comments on commit ccf681e

Please sign in to comment.