Skip to content

Commit

Permalink
KAZOO-3258: flush the local cache prior to any operation that may upd…
Browse files Browse the repository at this point in the history
…ate a couchdb document
  • Loading branch information
k-anderson committed Jan 24, 2015
1 parent 5bc521f commit d085cf1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 28 deletions.
17 changes: 9 additions & 8 deletions core/whistle_couch-1.0.0/src/couch_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -529,19 +529,20 @@ cache_db_doc(DbName, DocId, Doc) ->
{'error', _}=E -> E
end.

-spec flush_cache_doc(ne_binary(), ne_binary()) ->
-spec flush_cache_doc(ne_binary(), ne_binary() | wh_json:object()) ->
'ok' |
{'error', 'invalid_db_name'}.
-spec flush_cache_doc(ne_binary(), ne_binary(), wh_proplist()) ->
flush_cache_doc(DbName, Doc) ->
flush_cache_doc(DbName, Doc, []).

-spec flush_cache_doc(ne_binary(), ne_binary() | wh_json:object(), wh_proplist()) ->
'ok' |
{'error', 'invalid_db_name'}.
flush_cache_doc(DbName, DocId) ->
flush_cache_doc(DbName, DocId, []).
flush_cache_doc(DbName, DocId, Options) when ?VALID_DBNAME ->
couch_util:flush_cache_doc(wh_couch_connections:get_server(), DbName, DocId, Options);
flush_cache_doc(DbName, DocId, Options) ->
flush_cache_doc(DbName, Doc, Options) when ?VALID_DBNAME ->
couch_util:flush_cache_doc(DbName, Doc, Options);
flush_cache_doc(DbName, Doc, Options) ->
case maybe_convert_dbname(DbName) of
{'ok', Db} -> flush_cache_doc(Db, DocId, Options);
{'ok', Db} -> flush_cache_doc(Db, Doc, Options);
{'error', _}=E -> E
end.

Expand Down
62 changes: 42 additions & 20 deletions core/whistle_couch-1.0.0/src/couch_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
%% Doc related
-export([open_cache_doc/4
,cache_db_doc/3
,flush_cache_doc/4
,flush_cache_docs/0, flush_cache_docs/1
,flush_cache_doc/2
,flush_cache_doc/3
,flush_cache_docs/0
,flush_cache_docs/1
,flush_cache_docs/2
,flush_cache_docs/3
,open_doc/4
,lookup_doc_rev/3
,save_doc/4
Expand Down Expand Up @@ -419,16 +423,24 @@ cache_db_doc(DbName, DocId, CacheValue) ->
CacheProps = [{'origin', {'db', DbName, DocId}}],
wh_cache:store_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}, CacheValue, CacheProps).

-spec flush_cache_doc(server() | 'undefined', ne_binary() | db(), ne_binary(), wh_proplist()) -> 'ok'.
flush_cache_doc(Server, #db{name=Name}, DocId, Options) ->
flush_cache_doc(Server, wh_util:to_binary(Name), DocId, Options);
flush_cache_doc(_, DbName, DocId, _Options) ->
wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId}).
-spec flush_cache_doc(ne_binary() | db(), ne_binary() | wh_json:object()) -> 'ok'.
flush_cache_doc(#db{name=Name}, Doc) ->
flush_cache_doc(#db{name=Name}, Doc, []).

-spec flush_cache_docs() -> 'ok'.
-spec flush_cache_docs(ne_binary()) -> 'ok'.
-spec flush_cache_doc(ne_binary() | db(), ne_binary() | wh_json:object(), wh_proplist()) -> 'ok'.
flush_cache_doc(#db{name=Name}, Doc, Options) ->
flush_cache_doc(wh_util:to_binary(Name), Doc, Options);
flush_cache_doc(DbName, DocId, _Options) when is_binary(DocId) ->
wh_cache:erase_local(?WH_COUCH_CACHE, {?MODULE, DbName, DocId});
flush_cache_doc(DbName, Doc, Options) ->
flush_cache_doc(DbName, doc_id(Doc), Options).

-spec flush_cache_docs() -> 'ok'.
flush_cache_docs() -> wh_cache:flush_local(?WH_COUCH_CACHE).

-spec flush_cache_docs(ne_binary() | db()) -> 'ok'.
flush_cache_docs(#db{name=Name}) ->
flush_cache_docs(wh_util:to_binary(Name));
flush_cache_docs(DbName) ->
Filter = fun({?MODULE, DbName1, _DocId}=K, _) when DbName1 =:= DbName ->
wh_cache:erase_local(?WH_COUCH_CACHE, K),
Expand All @@ -438,6 +450,17 @@ flush_cache_docs(DbName) ->
_ = wh_cache:filter_local(?WH_COUCH_CACHE, Filter),
'ok'.

-spec flush_cache_docs(ne_binary() | db(), ne_binaries() | wh_json:objects()) -> 'ok'.
flush_cache_docs(Db, Docs) ->
flush_cache_docs(Db, Docs, []).

-spec flush_cache_docs(ne_binary() | db(), ne_binaries() | wh_json:objects(), wh_proplist()) -> 'ok'.
flush_cache_docs(Db, Docs, Options) ->
_ = [flush_cache_doc(Db, Doc, Options)
|| Doc <- Docs
],
'ok'.

-spec open_doc(server(), ne_binary(), ne_binary(), wh_proplist()) ->
{'ok', wh_json:object()} |
couchbeam_error().
Expand Down Expand Up @@ -545,9 +568,7 @@ prepare_doc_for_del(Conn, #db{name=DbName}, Doc) ->
couchbeam_error().
do_ensure_saved(#db{}=Db, Doc, Opts) ->
case do_save_doc(Db, Doc, Opts) of
{'ok', JObj}=Ok ->
_ = maybe_publish_doc(Db, Doc, JObj),
Ok;
{'ok', _}=Ok -> Ok;
{'error', 'conflict'} ->
case do_fetch_rev(Db, doc_id(Doc)) of
{'error', 'not_found'} ->
Expand Down Expand Up @@ -584,9 +605,11 @@ do_fetch_doc(#db{}=Db, DocId, Options) ->
do_save_doc(#db{}=Db, Docs, Options) when is_list(Docs) ->
do_save_docs(Db, Docs, Options);
do_save_doc(#db{}=Db, Doc, Options) ->
case ?RETRY_504(couchbeam:save_doc(Db, maybe_set_docid(Doc), Options)) of
PreparedDoc = maybe_set_docid(Doc),
_ = flush_cache_doc(Db, PreparedDoc),
case ?RETRY_504(couchbeam:save_doc(Db, PreparedDoc, Options)) of
{'ok', JObj}=Ok ->
_ = maybe_publish_doc(Db, Doc, JObj),
_ = maybe_publish_doc(Db, PreparedDoc, JObj),
Ok;
Else -> Else
end.
Expand All @@ -611,6 +634,7 @@ do_save_docs(#db{}=Db, Docs, Options, Acc) ->
case catch(lists:split(?MAX_BULK_INSERT, Docs)) of
{'EXIT', _} ->
PreparedDocs = [maybe_set_docid(D) || D <- Docs],
_ = flush_cache_docs(Db, PreparedDocs),
case ?RETRY_504(couchbeam:save_docs(Db, PreparedDocs, Options)) of
{'ok', JObjs} ->
_ = maybe_publish_docs(Db, PreparedDocs, JObjs),
Expand All @@ -619,6 +643,7 @@ do_save_docs(#db{}=Db, Docs, Options, Acc) ->
end;
{Save, Cont} ->
PreparedDocs = [maybe_set_docid(D) || D <- Save],
_ = flush_cache_docs(Db, PreparedDocs),
case ?RETRY_504(couchbeam:save_docs(Db, PreparedDocs, Options)) of
{'ok', JObjs} ->
_ = maybe_publish_docs(Db, PreparedDocs, JObjs),
Expand Down Expand Up @@ -684,6 +709,7 @@ do_stream_attachment(#db{}=Db, DocId, AName, Caller) ->
{'ok', wh_json:object()} |
couchbeam_error().
do_put_attachment(#db{}=Db, DocId, AName, Contents, Options) ->
_ = flush_cache_doc(Db, DocId),
case ?RETRY_504(couchbeam:put_attachment(Db, DocId, AName, Contents, Options)) of
{'ok', JObj}=Ok ->
maybe_publish_doc(Db, wh_json:from_list([{<<"_id">>, DocId}]), maybe_add_pvt_type(Db, DocId, JObj)),
Expand All @@ -696,6 +722,7 @@ do_put_attachment(#db{}=Db, DocId, AName, Contents, Options) ->
couchbeam_error().
do_del_attachment(#db{}=Db, DocId, AName, Options) ->
Doc = wh_util:to_binary(http_uri:encode(wh_util:to_list(DocId))),
_ = flush_cache_doc(Db, DocId),
case ?RETRY_504(couchbeam:delete_attachment(Db, Doc, AName, Options)) of
{'ok', JObj}=Ok ->
maybe_publish_doc(Db, wh_json:from_list([{<<"_id">>, DocId}]), maybe_add_pvt_type(Db, DocId, JObj)),
Expand Down Expand Up @@ -811,15 +838,10 @@ retry504s(Fun, Cnt) ->
end.

-spec maybe_publish_docs(couchbeam_db(), wh_json:objects(), wh_json:objects()) -> 'ok'.
maybe_publish_docs(#db{name=DbName}=Db, Docs, JObjs) ->
maybe_publish_docs(#db{}=Db, Docs, JObjs) ->
case couch_mgr:change_notice() of
'true' ->
spawn(fun() ->
[wh_cache:erase_local(?WH_COUCH_CACHE
,{?MODULE, wh_util:to_binary(DbName), doc_id(Doc)}
)
|| Doc <- Docs
],
[publish_doc(Db, Doc, JObj)
|| {Doc, JObj} <- lists:zip(Docs, JObjs),
should_publish_doc(Doc)
Expand Down

0 comments on commit d085cf1

Please sign in to comment.