Skip to content

WIP add downloading part from fink_ai topics in datatransfert #274 #275

Open
Farid841 wants to merge 1 commit into
astrolabsoftware:masterfrom
Farid841:master
Open

WIP add downloading part from fink_ai topics in datatransfert #274 #275
Farid841 wants to merge 1 commit into
astrolabsoftware:masterfrom
Farid841:master

Conversation

@Farid841

Copy link
Copy Markdown

No description provided.

Copilot AI review requested due to automatic review settings June 11, 2026 10:28

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds first-class support to finkctl_transfer for “AI transfer” topics by consuming JSON predictions, joining them with original Avro alerts from a companion feed topic, and writing enriched Parquet output.

Changes:

  • Detect fink_ai_* topics and route them through a new AI transfer path.
  • Add logic to read/flatten alerts, parse/fix Avro schema, join with predictions, and write Parquet datasets.
  • Expand documentation and topic validation messaging to include AI topics.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +292 to +300
rng = np.random.RandomState(42)
pq.write_to_dataset(
table,
args.outdir,
schema=arrow_schema,
basename_template="part-0-{{i}}-{}.parquet".format(rng.randint(0, int(1e9))),
partition_cols=partitioning,
existing_data_behavior="overwrite_or_ignore",
)
Comment on lines +567 to +571
valid_prefixes = ("ftransfer", "fxmatch", _AI_TOPIC_PREFIX)
if not args.topic.startswith(valid_prefixes):
msg = """
{} is not a valid topic name.
Topic name must start with `ftransfer_` or `fxmatch_`.
Topic name must start with `ftransfer_`, `fxmatch_`, or `fink_ai_`.
Comment on lines +97 to +107
def _get_ai_schema(kafka_config: dict, feed_topic: str, maxtimeout: float):
"""Return parsed Avro schema for the feed topic, with Spark union fix applied."""
raw = get_schema_from_stream(kafka_config, feed_topic, maxtimeout)
if raw is None:
return None
# raw is already parsed by get_schema_from_stream — re-parse with the fix
# by extracting the original dict and re-parsing
try:
return fastavro.parse_schema(_fix_spark_schema(raw))
except Exception:
return raw
Comment on lines +151 to +154
try:
rec = json.loads(msg.value())
except (json.JSONDecodeError, Exception):
continue
Comment on lines +253 to +278
rows = []
for candid, pred in predictions.items():
row = {"candid": candid, **pred}
if candid in alerts:
row.update({k: v for k, v in alerts[candid].items() if k != "candid"})
rows.append(row)

all_keys = list(rows[0].keys())
for row in rows[1:]:
for k in row:
if k not in all_keys:
all_keys.append(k)

columns = {k: [row.get(k) for row in rows] for k in all_keys}

arrays = {}
for k, vals in columns.items():
if k == "predictions":
arrays[k] = pa.array(vals, type=pa.list_(pa.float64()))
else:
try:
arrays[k] = pa.array(vals)
except Exception:
arrays[k] = pa.array([str(v) if v is not None else None for v in vals])

table = pa.table(arrays)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants