Skip to content

Commit b2d9c36

Browse files
committed
Fixed: TSV header getting included and passed to postgres table
1 parent 11289ad commit b2d9c36

File tree

5 files changed

+569
-51
lines changed

5 files changed

+569
-51
lines changed

dags/operators/csv_to_postgres.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
class LoadCsvtoPostgresOperator(BaseOperator):
66
"""
7-
Moves data from a comma seperated file to Postgres
7+
Moves data from a delimiter(tab) seperated file to Postgres
88
"""
99
template_fields = ("file_path",)
1010

@@ -22,6 +22,6 @@ def execute(self, context):
2222
postgres.bulk_load(self.table, self.file_path)
2323
except Exception as err:
2424
self.log.error(err)
25-
return err
25+
raise ValueError(err)
2626
else:
2727
self.log.info(f"Loaded file {self.file_path} into table {self.table}")

dags/pipeline.py

+7-42
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ def _fetch_data(outfile):
4646
def _transform_to_csv(infile, outfile):
4747
pathlib.Path("/tmp/data/stg").mkdir(parents=True, exist_ok=True)
4848
data = pd.read_json(infile)
49-
data = data.set_index("date_of_interest")
50-
data.to_csv(outfile)
49+
data['date_of_interest'] = data['date_of_interest']\
50+
.apply(lambda date: date.strip('T00:00:00.000'))
51+
data.to_csv(outfile, index=False, header=False, sep="\t", encoding="utf-8")
5152
logging.info(f"INFO: Processed {infile} and moved it to {outfile}")
5253

5354

@@ -66,7 +67,7 @@ def _transform_to_csv(infile, outfile):
6667
dag=dag,
6768
op_kwargs={
6869
"infile": "/tmp/data/raw/covid_data_{{ ds }}.json",
69-
"outfile": "/tmp/data/raw/covid_data_{{ ds }}.csv",
70+
"outfile": "/tmp/data/stg/covid_data_{{ ds }}.csv",
7071
},
7172
)
7273

@@ -83,53 +84,17 @@ def _transform_to_csv(infile, outfile):
8384
create_covid_data_table = PostgresOperator(
8485
task_id="create_table_covid",
8586
postgres_conn_id="covid_postgres",
86-
sql="sql/create_table.sql",
87+
sql="sql/create_test.sql",
8788
dag=dag,
8889
)
8990

9091
load_csv_to_postgres_dwh = LoadCsvtoPostgresOperator(
9192
task_id='load_to_covid_data_table',
9293
postgres_conn_id="covid_postgres",
93-
table="covid_data",
94+
table="covid_table",
9495
file_path="/tmp/data/stg/covid_data_{{ ds }}.csv",
9596
dag=dag,
9697
)
9798

9899
# fetch_data >> transform_to_csv >> normalize_covid_csv >> create_covid_data_table>> load_csv_to_postgres_dwh
99-
100-
def _test_transform(infile, outfile):
101-
pathlib.Path("/tmp/data/stg").mkdir(parents=True, exist_ok=True)
102-
data = pd.read_json(infile)
103-
data = data.set_index("date_of_interest")
104-
data = data[['date_of_interest', 'case_count']]
105-
data.to_csv(outfile)
106-
logging.info(f"INFO: Processed {infile} and moved it to {outfile}")
107-
108-
test_transform = PythonOperator(
109-
task_id="test_transform",
110-
python_callable=_test_transform,
111-
dag=dag,
112-
op_kwargs={
113-
"infile": "/tmp/data/raw/covid_data_{{ ds }}.json",
114-
"outfile": "/tmp/data/raw/covid_data_{{ ds }}.csv",
115-
},
116-
117-
)
118-
119-
create_covid_test_table = PostgresOperator(
120-
task_id="create_covid_test_table",
121-
postgres_conn_id="covid_postgres",
122-
sql="sql/create_test.sql",
123-
dag=dag
124-
)
125-
126-
127-
test_data_load = LoadCsvtoPostgresOperator(
128-
task_id='test_data_load',
129-
postgres_conn_id="covid_postgres",
130-
table="covid_test",
131-
file_path="/tmp/data/stg/covid_data_{{ ds }}.csv",
132-
dag=dag,
133-
)
134-
135-
fetch_data >> test_transform >> normalize_covid_csv >> create_covid_test_table >> test_data_load
100+
fetch_data >> transform_to_csv >> create_covid_data_table >> load_csv_to_postgres_dwh

dags/sql/create_table.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
DROP TABLE IF EXISTS covid_data;
22

33
CREATE TABLE IF NOT EXISTS covid_data (
4-
date DATE PRIMARY KEY,
4+
date_of_interest DATE PRIMARY KEY,
55
case_count INT,
66
probable_case_count INT,
77
hospitalized_count INT,

dags/sql/create_test.sql

-6
This file was deleted.

0 commit comments

Comments
 (0)