Skip to content

Commit d26b7ef

Browse files
Merge pull request #649 from NEONScience/enviroscan
Enviroscan
2 parents 6d277da + b9576ea commit d26b7ef

37 files changed

Lines changed: 721 additions & 51 deletions
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
name: DEV-enviroscan-site-list
2+
on:
3+
push:
4+
branches:
5+
- 'master'
6+
paths:
7+
- 'pipe/enviroscan/site-list*.json'
8+
workflow_dispatch: {} # Allows trigger of workflow from web interface
9+
jobs:
10+
put_files:
11+
runs-on: arc-neon-gke
12+
#runs-on: ubuntu-latest
13+
env:
14+
PACHD_ADDRESS: grpcs://pachyderm-dev.transitions-nonprod.gcp.neoninternal.org:443
15+
PACH_TOKEN: ${{ secrets.RepoOwnerPachydermDev }}
16+
REPO: enviroscan_site_list # Pachyderm repo
17+
BRANCH: master
18+
IN_PATHS: 'pipe/enviroscan/site-list.json' # Comma-separated list (no spaces) to one or more paths or directories. Length must match OUT_PATHS. If directory, all files in directory will be placed in pachyderm at corresponding entry of OUT_PATHS.
19+
OUT_PATHS: 'site-list.json' # Comma-separated list (no spaces) of corresponding path(s) to place the files(s) in Pachyderm. Must be same length as IN_PATHS. If corresponding entry in IN_PATHS is a file, specify to the file. If corresponding entry in IN_PATHS is a directory, specify to the directory.
20+
steps:
21+
- uses: actions/checkout@v4
22+
- run: ls -la
23+
24+
- name: Put file
25+
uses: ./.github/actions/put-files
26+
with:
27+
pachd_address: ${{ env.PACHD_ADDRESS }}
28+
pach_token: ${{ env.PACH_TOKEN }}
29+
repo_name: ${{ env.REPO }}
30+
branch_name: ${{ env.BRANCH }}
31+
in_paths: ${{ env.IN_PATHS }}
32+
out_paths: ${{ env.OUT_PATHS }}
33+
34+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
name: DEV-enviroscan-update-dag
2+
on:
3+
push:
4+
branches:
5+
- 'master'
6+
paths:
7+
- 'pipe/enviroscan/*.yaml'
8+
- 'pipe/enviroscan/pipe_list_enviroscan.txt'
9+
workflow_dispatch: {} # Allows trigger of workflow from web interface
10+
11+
jobs:
12+
# -------------------------------------------------------------
13+
# Using GitHub's API is not supported for push events
14+
# -------------------------------------------------------------
15+
#
16+
# ----------------------------------------------------------------------------------------------
17+
# Using local .git history
18+
# ----------------------------------------------------------------------------------------------
19+
# Event `push`: Compare the preceding remote commit -> to the current commit of the main branch
20+
# ----------------------------------------------------------------------------------------------
21+
22+
changed_files:
23+
runs-on: ubuntu-latest # windows-latest || macos-latest
24+
outputs:
25+
# Use this changed_file_list if you plan to use get-changed-files-action
26+
changed_file_list: ${{ steps.changed-files-action.outputs.changed_file_list }}
27+
steps:
28+
- uses: actions/checkout@v4
29+
with:
30+
fetch-depth: 0 # OR "2" -> To retrieve the preceding commit.
31+
32+
# Using get-changed-files-action
33+
- name: Get changed files action
34+
id: changed-files-action
35+
uses: ./.github/actions/get-changed-files
36+
37+
update_pipelines:
38+
needs: changed_files
39+
runs-on: arc-neon-gke
40+
#runs-on: ubuntu-latest
41+
env:
42+
PACHD_ADDRESS: grpcs://pachyderm-dev.transitions-nonprod.gcp.neoninternal.org:443
43+
PACH_TOKEN: ${{ secrets.RepoOwnerPachydermDev }}
44+
PATHS: 'pipe/enviroscan' # Separate multiple with comma (e.g. 'pipe/pqs1,pipe/parWaterSurface'). Order matters.
45+
TRANSACTION: True
46+
UPDATE_SCOPE: changed # 'all' or 'changed'. If not specified, all will be updated. 'changed' will update/create any changed/non-existent pipelines.
47+
CHANGED_FILES: ${{needs.changed_files.outputs.changed_file_list}}
48+
steps:
49+
- uses: actions/checkout@v4
50+
- run: ls -la
51+
52+
- name: Update pipelines
53+
uses: ./.github/actions/update-pipelines
54+
with:
55+
pachd_address: ${{ env.PACHD_ADDRESS }}
56+
pach_token: ${{ env.PACH_TOKEN }}
57+
paths: ${{ env.PATHS }}
58+
transaction: ${{ env.TRANSACTION }}
59+
update_scope: ${{ env.UPDATE_SCOPE }}
60+
changed_files: ${{ env.CHANGED_FILES }}
61+

.github/workflows/build_push_calibration_group_and_convert.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ on:
77
paths:
88
- 'modules_combined/calibration_group_and_convert/**'
99
- 'modules/filter_joiner/**'
10+
- 'modules/array_parser/**'
1011
- 'modules/common/**'
1112
- 'flow/flow.kfka.comb/**'
1213
- 'flow/flow.cal.conv/**'

modules/array_parser/array_parser.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,17 @@ def parse(config: Config) -> None:
3636

3737
def link_calibration_file(path: Path, out_path, schema_data: SchemaData) -> None:
3838
stream_id = calibration_file_parser.get_stream_id(path)
39-
field_name = schema_data.mapping.get(stream_id)
39+
field_name = schema_data.calibration_mapping.get(stream_id)
4040
link_path = Path(out_path, field_name, path.name)
41-
log.debug(f'calibration link: {link_path}')
42-
link_path.parent.mkdir(parents=True, exist_ok=True)
43-
link_path.symlink_to(path)
41+
if not link_path.exists():
42+
log.debug(f'calibration link: {link_path}')
43+
link_path.parent.mkdir(parents=True, exist_ok=True)
44+
link_path.symlink_to(path)
4445

4546

4647
def link_data_file(path: Path, out_path: Path) -> None:
4748
link_path = Path(out_path, path.name)
48-
link_path.parent.mkdir(parents=True, exist_ok=True)
49-
log.debug(f'data link: {link_path}')
50-
link_path.symlink_to(path)
49+
if not link_path.exists():
50+
link_path.parent.mkdir(parents=True, exist_ok=True)
51+
log.debug(f'data link: {link_path}')
52+
link_path.symlink_to(path)

modules/array_parser/array_parser_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def main() -> None:
1717
schema_path: Path = env.path('SCHEMA_PATH')
1818
out_path: Path = env.path('OUT_PATH')
1919
parse_calibration = env.bool('PARSE_CALIBRATION')
20-
log_level: str = env.log_level('LOG_LEVEL', 'INFO')
20+
log_level: str = env.str('LOG_LEVEL', 'INFO')
2121
source_type_index: int = env.int('SOURCE_TYPE_INDEX')
2222
year_index: int = env.int('YEAR_INDEX')
2323
month_index: int = env.int('MONTH_INDEX')

modules/array_parser/data_file_parser.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,41 @@ def write_restructured_file(path: Path, out_path: Path, schema: Path) -> None:
6363
:param schema: The new schema for the reordered file.
6464
:return: None
6565
"""
66-
table = pq.read_table(path)
67-
data_values = table.column(3)
68-
data_type: pa.lib.ListType = data_values.type
66+
67+
# Read the schema
6968
schema_data: SchemaData = schema_parser.parse_schema_file(schema)
7069
field_names = schema_data.field_names
71-
new_columns: List[list] = create_columns(field_names)
72-
populate_columns(table, field_names, data_values, new_columns)
73-
for i in range(0, len(new_columns)):
70+
71+
# Parse the array(s) into the new table
72+
table = pq.read_table(path)
73+
column_names = table.column_names
74+
array_names = set(schema_data.data_mapping.values())
75+
for array_name in array_names:
76+
column_index = column_names.index(array_name)
77+
data_values = table.column(column_index)
78+
array_field_names=[key for key, value in schema_data.data_mapping.items() if value == array_name] # field names pertaining to this array
79+
parsed_columns: List[list] = create_columns(array_field_names)
80+
data_type: pa.lib.ListType = data_values.type
81+
populate_columns(table, array_field_names, data_values, parsed_columns)
82+
7483
# convert to arrays with the appropriate type
75-
column: pa.Array = pa.array(new_columns[i], data_type.value_type)
76-
table: pa.Table = table.append_column(field_names[i], column) # add column to table
77-
table = table.remove_column(3) # remove original data array from table
84+
for i in range(0, len(parsed_columns)):
85+
column: pa.Array = pa.array(parsed_columns[i], data_type.value_type)
86+
table: pa.Table = table.append_column(array_field_names[i], column) # add column to table
87+
88+
# remove original data arrays from table
89+
for array_name in array_names:
90+
column_names = table.column_names
91+
column_index = column_names.index(array_name)
92+
table = table.remove_column(column_index)
93+
94+
# Rearrange columns to match the parsed schema
95+
table=table.select(field_names)
7896
metadata = get_metadata(schema_data)
7997
table = table.replace_schema_metadata(metadata)
8098
log.debug(f'modified_table:\n{table}')
99+
100+
# Output
81101
file_path = Path(out_path, path.name)
82102
file_path.parent.mkdir(parents=True, exist_ok=True)
83103
file_path.touch()

modules/array_parser/schema_parser.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,39 @@ class SchemaData(NamedTuple):
88
schema: str
99
source_type: str
1010
field_names: List[str]
11-
mapping: dict
11+
parse_field_names: List[str]
12+
calibration_mapping: dict
13+
data_mapping: dict
1214

1315

1416
def parse_schema_file(path: Path) -> SchemaData:
1517
"""
16-
Get the mapping between stream IDs and schema field names.
18+
Get the mapping between stream IDs and schema field names for any applicable calibration data
19+
Also get the mapping between schema field names and array names
1720
1821
:param path: The file path.
19-
:return: The source name and the mapping between stream IDs and schema field names.
22+
:return: The source name and the mapping between stream IDs -> schema field names, and schema field names -> array names (i.e. which array they are in)
2023
"""
21-
field_exclusions = ['source_id', 'site_id', 'readout_time']
24+
field_exclusions = ['source_id', 'site_id', 'readout_time'] # Assumes all other fields are fields to be parsed.
2225
with open(str(path), 'r') as file:
2326
json_data = json.load(file)
2427
source_type = json_data['source']
2528
fields = json_data['fields']
26-
mapping = {}
29+
calibration_mapping = {}
30+
data_mapping = {}
2731
field_names = []
32+
parse_field_names = []
2833
for field in fields:
2934
name = field['name']
35+
field_names.append(name)
3036
if name not in field_exclusions:
31-
field_names.append(name)
37+
parse_field_names.append(name)
3238
try:
3339
stream_id = field['__neon_stream_id']
34-
mapping[stream_id] = name
40+
array_name = field['__raw_array_name']
41+
calibration_mapping[stream_id] = name
42+
data_mapping[name] = array_name
3543
except KeyError:
3644
continue
3745
schema = json.dumps(json_data)
38-
return SchemaData(schema=schema, source_type=source_type, field_names=field_names, mapping=mapping)
46+
return SchemaData(schema=schema, source_type=source_type, field_names=field_names, parse_field_names=parse_field_names, calibration_mapping=calibration_mapping,data_mapping=data_mapping)

modules/calval_loader/get_calibration_stream_name.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ def get_calibration_stream_name(connection, asset_type: str, stream_number: int)
2828
and
2929
is_asset_definition.sensor_type_name = %(sensor_type_name)s
3030
'''
31+
# print(f'Finding stream name for asset_type: {asset_type} and stream_number: {stream_number}')
3132
with closing(connection.cursor()) as cursor:
3233
cursor.execute(sql, dict(sensor_type_name=asset_type, stream_number=stream_number))
3334
row = cursor.fetchone()
3435
if row is None:
3536
logging.error(f'Stream name not found for stream ID {stream_number} and asset type {asset_type}.')
3637
return None
3738
stream_name = row[0]
38-
# print(f'stream_name: {stream_name}')
39+
# print(f'asset_type: {asset_type} stream_name: {stream_name}')
3940
return stream_name

modules/calval_loader/load_all_calval_files.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ def load() -> None:
4848
stream_id = root.find('StreamCalVal').find('StreamID').text
4949
stream_name = get_calibration_stream_name(connector.get_connection(), avro_schema_name,
5050
stream_id)
51-
print('repo name , asset_id, stream_name, filename are :', avro_schema_name, " ", asset_id,
52-
" ",
53-
stream_name, " ", filename)
51+
print('schema name , asset_id, stream_id, stream_name, filename are :', avro_schema_name, " ", asset_id,
52+
" ", stream_id, " ", stream_name, " ", filename)
5453
try:
5554
output_path = Path(output_directory, avro_schema_name, asset_id, stream_name, filename)
5655
output_path.parent.mkdir(parents=True, exist_ok=True)

modules_combined/calibration_group_and_convert/Dockerfile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Dockerfile for NEON IS Data Processing - combined filter-joiner, kafka combiner, and Calibration Conversion
1+
# Dockerfile for NEON IS Data Processing - combined filter-joiner, kafka combiner, array parser, and Calibration Conversion
22
# Example command (must be run from project parent directory to include modules/ and flow/ paths in Docker context):
33
# docker build -t neon-is-cal-grp-conv -f ./modules_combined/calibration_group_and_convert/Dockerfile .
44

@@ -12,13 +12,15 @@ MAINTAINER "Cove Sturtevant" csturtevant@battelleecology.org
1212
# Add in the python-based filter-joiner module
1313
ARG MODULE_DIR="modules"
1414
ARG APP_DIR="filter_joiner"
15+
ARG APP_DIR_2="array_parser"
1516
ARG COMMON_DIR="common"
1617
ARG CONTAINER_APP_DIR="/usr/src/app"
1718
ENV PYTHONPATH="${PYTHONPATH}:${CONTAINER_APP_DIR}"
1819

1920
WORKDIR ${CONTAINER_APP_DIR}
2021

2122
COPY ${MODULE_DIR}/${APP_DIR}/requirements.txt ${CONTAINER_APP_DIR}/${APP_DIR}/requirements.txt
23+
COPY ${MODULE_DIR}/${APP_DIR_2}/requirements.txt ${CONTAINER_APP_DIR}/${APP_DIR_2}/requirements.txt
2224

2325

2426
RUN apt update && \
@@ -27,6 +29,7 @@ RUN apt update && \
2729
apt install -y python3-pip && \
2830
python3 -mpip install --no-cache-dir --upgrade pip setuptools wheel && \
2931
python3 -mpip install --no-cache-dir -r ${CONTAINER_APP_DIR}/${APP_DIR}/requirements.txt && \
32+
python3 -mpip install --no-cache-dir -r ${CONTAINER_APP_DIR}/${APP_DIR_2}/requirements.txt && \
3033
apt-get autoremove -y && \
3134
apt-get autoclean -y && \
3235
rm -rf /var/lib/apt/lists/* && \
@@ -35,6 +38,7 @@ RUN apt update && \
3538

3639
# Copy in python code
3740
COPY ${MODULE_DIR}/${APP_DIR} ${CONTAINER_APP_DIR}/${APP_DIR}
41+
COPY ${MODULE_DIR}/${APP_DIR_2} ${CONTAINER_APP_DIR}/${APP_DIR_2}
3842
COPY ${MODULE_DIR}/${COMMON_DIR} ${CONTAINER_APP_DIR}/${COMMON_DIR}
3943

4044

0 commit comments

Comments
 (0)