Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/implement staging layer #178

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,17 @@ models:
dado_pessoal: sim
dado_sensivel: sim
dominio: historico_clinico_app

staging:
+materialized: table
+tags: ["staging"]
prontuario_vitacare:
+schema: brutos_prontuario_vitacare_staging
prontuario_vitai:
+schema: brutos_prontuario_vitai_staging
plataforma_smsrio:
+schema: brutos_plataforma_smsrio_staging
estoque_central_tpc:
+schema: brutos_estoque_central_tpc_staging
tests:
rj_sms:
+severity: error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
modules.datetime.date.today() - modules.datetime.timedelta(days=7)
).isoformat() %}

{% set min_date = '2024-10-01' %}
{% set min_date = '2024-12-01' %}

with
unidades_esperadas as (
Expand All @@ -25,7 +25,7 @@ with
'CENTRAL' as unidade_cnes,
'tpc' as fonte,
'posicao' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(_data_carga as timestamp) as date) as data_ingestao
from {{ source("brutos_estoque_central_tpc_staging", "estoque_posicao") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -35,12 +35,12 @@ with
{% endif %}
),
datas as (
select data_atualizacao
select data_ingestao
{% if is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_ingestao
{% endif %}
{% if not is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_ingestao
{% endif %}
),
entidades as (
Expand All @@ -51,7 +51,7 @@ with
cast(null as string) as unidade_cnes,
'tpc' as fonte,
entidades.tipo as tipo,
cast(datas.data_atualizacao as string) as data_atualizacao
cast(datas.data_ingestao as date) as data_ingestao
from datas, entidades
),
tpc_estoque as (
Expand All @@ -61,7 +61,7 @@ with
),
contagem as (
select
data_atualizacao,
data_ingestao,
fonte,
tipo,
array_agg(distinct unidade_cnes ignore nulls) as unidades_com_dado,
Expand All @@ -83,7 +83,7 @@ with
from contagem
)
select
concat(data_atualizacao, '.', fonte, '.', tipo) as id,
concat(data_ingestao, '.', fonte, '.', tipo) as id,
*
from contagem_com_complemento
order by id
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
modules.datetime.date.today() - modules.datetime.timedelta(days=7)
).isoformat() %}

{% set min_date = '2024-10-01' %}
{% set min_date = '2024-12-01' %}

with
unidades_esperadas as (
Expand All @@ -20,14 +20,17 @@ with
id_cnes as unidade_cnes,
nome_limpo as unidade_nome
from {{ref('dim_estabelecimento')}}
where prontuario_estoque_tem_dado = 'sim' and prontuario_versao = 'vitacare'
where
prontuario_tem = "sim" and
prontuario_estoque_tem_dado = 'sim' and
prontuario_versao = 'vitacare'
),
vitacare_estoque_posicao as (
select
cnesUnidade as unidade_cnes,
'vitacare' as fonte,
'posicao' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(_data_carga as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitacare_staging", "estoque_posicao") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -41,7 +44,7 @@ with
cnesUnidade as unidade_cnes,
'vitacare' as fonte,
'movimento' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(_data_carga as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitacare_staging", "estoque_movimento") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -50,36 +53,52 @@ with
where data_particao >= '{{min_date}}'
{% endif %}
),
vitacare_vacina as (
select
nCnesUnidade as unidade_cnes,
'vitacare' as fonte,
'vacina' as tipo,
safe_cast(safe_cast(_data_carga as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitacare_staging", "vacina") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
{% endif %}
{% if not is_incremental() %}
where data_particao >= '{{min_date}}'
{% endif %}
),
datas as (
select data_atualizacao
select data_ingestao
{% if is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_ingestao
{% endif %}
{% if not is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_ingestao
{% endif %}
),
entidades as (
select tipo from unnest(['posicao','movimento']) tipo
select tipo from unnest(['posicao','movimento','vacina']) tipo
),
todos_registros_possiveis as (
select
cast(null as string) as unidade_cnes,
'vitacare' as fonte,
entidades.tipo as tipo,
cast(datas.data_atualizacao as string) as data_atualizacao,
cast(datas.data_ingestao as date) as data_ingestao,
from datas, entidades
),
vitacare_estoque as (
select * from vitacare_estoque_posicao
union all
select * from vitacare_estoque_movimento
union all
select * from vitacare_vacina
union all
select * from todos_registros_possiveis
),
contagem as (
select
data_atualizacao,
data_ingestao,
fonte,
tipo,
array_agg(distinct unidade_cnes ignore nulls) as unidades_com_dado,
Expand All @@ -102,7 +121,7 @@ with
)

select
concat(data_atualizacao, '.', fonte, '.', tipo) as id,
concat(data_ingestao, '.', fonte, '.', tipo) as id,
*
from contagem_com_complemento
order by id
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
modules.datetime.date.today() - modules.datetime.timedelta(days=7)
).isoformat() %}

{% set min_date = '2024-10-01' %}
{% set min_date = '2024-12-01' %}

with
unidades_esperadas as (
Expand All @@ -20,14 +20,17 @@ with
id_cnes as unidade_cnes,
nome_limpo as unidade_nome
from {{ref('dim_estabelecimento')}}
where prontuario_estoque_tem_dado = 'sim' and prontuario_versao = 'vitai'
where
prontuario_tem = "sim" and
prontuario_estoque_tem_dado = 'sim' and
prontuario_versao = 'vitai'
),
vitai_estoque_posicao as (
select
cnes as unidade_cnes,
'vitai' as fonte,
'posicao' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(_data_carga as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitai_staging", "estoque_posicao") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -41,7 +44,7 @@ with
cnes as unidade_cnes,
'vitai' as fonte,
'movimento' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(_data_carga as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitai_staging", "estoque_movimento") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -51,12 +54,12 @@ with
{% endif %}
),
datas as (
select data_atualizacao
select data_ingestao
{% if is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_ingestao
{% endif %}
{% if not is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_ingestao
{% endif %}
),
entidades as (
Expand All @@ -67,7 +70,7 @@ with
cast(null as string) as unidade_cnes,
'vitai' as fonte,
entidades.tipo as tipo,
cast(datas.data_atualizacao as string) as data_atualizacao,
cast(datas.data_ingestao as date) as data_ingestao,
from datas, entidades
),
vitai_estoque as (
Expand All @@ -79,7 +82,7 @@ with
),
contagem as (
select
data_atualizacao,
data_ingestao,
fonte,
tipo,
array_agg(distinct unidade_cnes ignore nulls) as unidades_com_dado,
Expand All @@ -101,7 +104,7 @@ with
from contagem
)
select
concat(data_atualizacao, '.', fonte, '.', tipo) as id,
concat(data_ingestao, '.', fonte, '.', tipo) as id,
*
from contagem_com_complemento
order by id
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
modules.datetime.date.today() - modules.datetime.timedelta(days=7)
).isoformat() %}

{% set min_date = '2024-10-01' %}
{% set min_date = '2024-12-01' %}

with
unidades_esperadas as (
Expand All @@ -20,14 +20,16 @@ with
id_cnes as unidade_cnes,
nome_limpo as unidade_nome
from {{ref('dim_estabelecimento')}}
where prontuario_versao = 'vitacare'
where
prontuario_tem = "sim" and
prontuario_versao = 'vitacare'
),
vitacare_paciente as (
select
payload_cnes as unidade_cnes,
'vitacare' as fonte,
'paciente' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(datalake_loaded_at as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitacare_staging", "paciente_eventos") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -41,7 +43,7 @@ with
payload_cnes as unidade_cnes,
'vitacare' as fonte,
'episodio' as tipo,
data_particao as data_atualizacao
safe_cast(safe_cast(datalake_loaded_at as timestamp) as date) as data_ingestao
from {{ source("brutos_prontuario_vitacare_staging", "atendimento_eventos") }}
{% if is_incremental() %}
where data_particao > '{{seven_days_ago}}'
Expand All @@ -51,12 +53,12 @@ with
{% endif %}
),
datas as (
select data_atualizacao
select data_ingestao
{% if is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{seven_days_ago}}', current_date())) as data_ingestao
{% endif %}
{% if not is_incremental() %}
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_atualizacao
from unnest(GENERATE_DATE_ARRAY('{{min_date}}', current_date())) as data_ingestao
{% endif %}
),
entidades as (
Expand All @@ -67,7 +69,7 @@ with
cast(null as string) as unidade_cnes,
'vitacare' as fonte,
entidades.tipo as tipo,
cast(datas.data_atualizacao as string) as data_atualizacao,
cast(datas.data_ingestao as date) as data_ingestao,
from datas, entidades
),
vitacare_historico_clinico as (
Expand All @@ -79,7 +81,7 @@ with
),
contagem as (
select
data_atualizacao,
data_ingestao,
fonte,
tipo,
array_agg(distinct unidade_cnes ignore nulls) as unidades_com_dado,
Expand All @@ -101,7 +103,7 @@ with
from contagem
)
select
concat(data_atualizacao, '.', fonte, '.', tipo) as id,
concat(data_ingestao, '.', fonte, '.', tipo) as id,
*
from contagem_com_complemento
order by id
Loading