[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Onboard COVID-19 Genome Sequence dataset #460

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Prev Previous commit
Next Next commit
Feat : Onboarding nhtsa dataset Production ready
  • Loading branch information
aurogoogle committed Aug 22, 2022
commit 383d645beab494dcd0b4b774ef10c79eeb5d75dc
Original file line number Diff line number Diff line change
Expand Up @@ -532,5 +532,11 @@
"type": "integer",
"description": "This data element records the number of drunk drivers involved in the crash. 00-99 Number of Drunk Drivers Involved in the Fatal Crash.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -544,5 +544,11 @@
"type": "integer",
"description": "This data element records the number of drunk drivers involved in the crash. 00-99 Number of Drunk Drivers Involved in the Fatal Crash.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -484,5 +484,11 @@
"type": "integer",
"description": "This data element records the number of drunk drivers involved in the crash. 00-99 Number of Drunk Drivers Involved in the Fatal Crash.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -772,5 +772,11 @@
"type": "string",
"description": "This data element identifies the attribute which best describes the location of this non-motorist with respect to the roadway at the time of the crash.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -706,5 +706,11 @@
"type": "string",
"description": "This data element identifies the attribute which best describes the location of this non-motorist with respect to the roadway at the time of the crash.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -712,5 +712,11 @@
"type": "string",
"description": "This data element identifies any mis-use of the helmet used by this person.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -724,5 +724,11 @@
"type": "string",
"description": "This element captures the completed/finished body class for an incomplete vehicle. An incomplete vehicle is completed by a final stage manufacturer. The intent of this data element is to capture the body class for incomplete vehicles when they are finished for road-use.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -1120,5 +1120,11 @@
"type": "string",
"description": "This data element records whether the driver was drinking and is derived from data elements in the Vehicle and Person data files. 0 No Drinking 1 Drinking -- Unknown",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -1156,5 +1156,11 @@
"type": "string",
"description": "This data element records the vehicle identification number (VIN) of any trailing units of a combination vehicle.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -1180,5 +1180,11 @@
"type": "string",
"description": "This data element records the vehicle identification number (VIN) of any trailing units of a combination vehicle.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -1204,5 +1204,11 @@
"type": "string",
"description": "This element identifies the gross vehicle weight rating of any trailing units as identified by the manufacturer in the vehicle’s VIN.",
"mode": "NULLABLE"
},
{
"name": "timestamp_of_crash",
"type": "timestamp",
"description": "This data element records the date and time on which the crash occurred.",
"mode": "NULLABLE"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ def execute_pipeline(
field_separator=",",
rename_mappings_list=rename_mappings_list,
input_dtypes=input_dtypes,
input_csv_headers=input_csv_headers
input_csv_headers=input_csv_headers,
pipeline_name = pipeline_name,
process_year = process_year
)
if os.path.exists(target_file):
upload_file_to_gcs(
Expand Down Expand Up @@ -165,7 +167,9 @@ def process_source_file(
field_separator: str,
rename_mappings_list: dict,
input_dtypes: dict,
input_csv_headers: typing.List[str]
input_csv_headers: typing.List[str],
pipeline_name: str,
process_year : int
) -> pd.DataFrame:
unpack_file(source_file, source_file_unzip_dir, "zip")
logging.info(f"Opening source file {source_file}")
Expand All @@ -191,7 +195,9 @@ def process_source_file(
input_dtypes=input_dtypes,
target_file=target_file,
chunk_number=chunk_number,
rename_mappings_list=rename_mappings_list
rename_mappings_list=rename_mappings_list,
pipeline_name = pipeline_name,
process_year = process_year
)
data = []
chunk_number += 1
Expand All @@ -202,7 +208,9 @@ def process_source_file(
input_dtypes=input_dtypes,
target_file=target_file,
chunk_number=chunk_number,
rename_mappings_list=rename_mappings_list
rename_mappings_list=rename_mappings_list,
pipeline_name = pipeline_name,
process_year = process_year
)


Expand All @@ -212,7 +220,9 @@ def process_dataframe_chunk(
input_dtypes: dict,
target_file: str,
chunk_number: int,
rename_mappings_list: dict
rename_mappings_list: dict,
pipeline_name : str,
process_year : int
) -> None:
df = pd.DataFrame(
data,
Expand All @@ -227,7 +237,9 @@ def process_dataframe_chunk(
target_file_batch=target_file_batch,
target_file=target_file,
skip_header=(not chunk_number == 1),
rename_headers_list=rename_mappings_list
rename_headers_list=rename_mappings_list,
pipeline_name = pipeline_name,
process_year =process_year
)


Expand All @@ -246,20 +258,36 @@ def process_chunk(
target_file_batch: str,
target_file: str,
skip_header: bool,
rename_headers_list: dict
rename_headers_list: dict,
pipeline_name : str,
process_year :int
) -> None:
logging.info(f"Processing batch file {target_file_batch}")
df = rename_headers(df, rename_headers_list)
# pipeline_name= 'Accident'
# if pipeline_name == 'Accident':
# create_new_timestamp_column(df)
save_to_new_file(df, file_path=str(target_file_batch), sep="|")
append_batch_file(target_file_batch, target_file, skip_header, not (skip_header))
new_pipeline_name = (pipeline_name.split('-')[1]).lower().strip()
if new_pipeline_name in ["accident"]:
create_new_timestamp_column(df,new_pipeline_name,process_year)
save_to_new_file(df, file_path=str(target_file_batch), sep="|")
append_batch_file(target_file_batch, target_file, skip_header, not (skip_header))
elif new_pipeline_name in ["person","vehicle"]:
create_new_timestamp_column(df,new_pipeline_name,process_year)
else:
save_to_new_file(df, file_path=str(target_file_batch), sep="|")
append_batch_file(target_file_batch, target_file, skip_header, not (skip_header))

logging.info(f"Processing batch file {target_file_batch} completed")

def create_new_timestamp_column(df: pd.DataFrame):
df['timestamp_of_crash']=df['year_of_crash'].apply(lambda x : str(x))+'-'+df['month_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+'-'+df['day_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+' '+df['hour_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+':'+df['minute_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+':'+'00'+' UTC'
return df
def create_new_timestamp_column(df: pd.DataFrame,new_pipeline_name:str,process_year:int):
if new_pipeline_name in ["accident"]:
df.drop(df[df['hour_of_crash'].apply(lambda x : int(x)) > 24].index, inplace = True)
df.drop(df[df['minute_of_crash'].apply(lambda x : int(x)) > 59].index, inplace = True)
df['timestamp_of_crash']=df['year_of_crash'].apply(lambda x : str(x))+'-'+df['month_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+'-'+df['day_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+' '+df['hour_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+':'+df['minute_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+':'+'00'+' UTC'
elif new_pipeline_name in ["person","vehicle"]:
df.drop(df[df['hour_of_crash'].apply(lambda x : int(x)) > 24].index, inplace = True)
df.drop(df[df['minute_of_crash'].apply(lambda x : int(x)) > 59].index, inplace = True)
df['timestamp_of_crash']=str(process_year)+'-'+df['month_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+'-'+df['day_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+' '+df['hour_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+':'+df['minute_of_crash'].apply(lambda x : "0"+str(x) if len(str(x))==1 else str(x))+':'+'00'+' UTC'

return df

def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None:
if compression_type == "zip":
Expand Down