Skip to content

Commit 11289ad

Browse files
committed
Database test implementation
1 parent 68751e7 commit 11289ad

File tree

4 files changed

+45
-6
lines changed

4 files changed

+45
-6
lines changed

dags/operators/csv_to_postgres.py

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ class LoadCsvtoPostgresOperator(BaseOperator):
77
Moves data from a comma seperated file to Postgres
88
"""
99
template_fields = ("file_path",)
10-
ui_color = '#D2B4DE'
1110

1211
@apply_defaults
1312
def __init__(self, postgres_conn_id, table, file_path, *args, **kwargs):

dags/pipeline.py

+38-1
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,41 @@ def _transform_to_csv(infile, outfile):
9595
dag=dag,
9696
)
9797

98-
fetch_data >> transform_to_csv >> normalize_covid_csv >> create_covid_data_table>> load_csv_to_postgres_dwh
98+
# fetch_data >> transform_to_csv >> normalize_covid_csv >> create_covid_data_table>> load_csv_to_postgres_dwh
99+
100+
def _test_transform(infile, outfile):
101+
pathlib.Path("/tmp/data/stg").mkdir(parents=True, exist_ok=True)
102+
data = pd.read_json(infile)
103+
data = data.set_index("date_of_interest")
104+
data = data[['date_of_interest', 'case_count']]
105+
data.to_csv(outfile)
106+
logging.info(f"INFO: Processed {infile} and moved it to {outfile}")
107+
108+
test_transform = PythonOperator(
109+
task_id="test_transform",
110+
python_callable=_test_transform,
111+
dag=dag,
112+
op_kwargs={
113+
"infile": "/tmp/data/raw/covid_data_{{ ds }}.json",
114+
"outfile": "/tmp/data/raw/covid_data_{{ ds }}.csv",
115+
},
116+
117+
)
118+
119+
create_covid_test_table = PostgresOperator(
120+
task_id="create_covid_test_table",
121+
postgres_conn_id="covid_postgres",
122+
sql="sql/create_test.sql",
123+
dag=dag
124+
)
125+
126+
127+
test_data_load = LoadCsvtoPostgresOperator(
128+
task_id='test_data_load',
129+
postgres_conn_id="covid_postgres",
130+
table="covid_test",
131+
file_path="/tmp/data/stg/covid_data_{{ ds }}.csv",
132+
dag=dag,
133+
)
134+
135+
fetch_data >> test_transform >> normalize_covid_csv >> create_covid_test_table >> test_data_load

dags/sql/create_table.sql

+1-4
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,5 @@ CREATE TABLE IF NOT EXISTS covid_data (
6565
incomplete INT
6666
);
6767

68-
CREATE TABLE IF NOT EXISTS covid_test (
69-
date DATE PRIMARY KEY,
70-
case_count INT
71-
)
68+
7269
-- TRUNCATE covid_data;

dags/sql/create_test.sql

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
DROP IF EXISTS TABLE covid_test
2+
3+
CREATE TABLE IF NOT EXISTS covid_test (
4+
date DATE PRIMARY KEY,
5+
case_count INT
6+
)

0 commit comments

Comments
 (0)