Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions projects/online/online/subprocesses/p_astro.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
import time
from pathlib import Path
from typing import TYPE_CHECKING

from torch.multiprocessing import Queue

from .utils import subprocess_wrapper

import h5py
import pickle

from ledger.events import EventSet, RecoveredInjectionSet
Expand All @@ -19,6 +21,10 @@

logger = logging.getLogger("pastro-process")

TIMEOUT = 10
TIMESTEP = 1e-3
MAX_RETRIES = int(TIMEOUT / TIMESTEP)


def fit_or_load_pastro(
model_path: Path,
Expand Down Expand Up @@ -96,9 +102,51 @@ def pastro_subprocess(
while True:
event = pastro_queue.get()
logger.info("Calculating p_astro")
pastro = pastro_model(event.detection_statistic)
pastro = float(pastro_model(event.detection_statistic))
graceid = pastro_queue.get()

logger.info(f"Submitting p_astro: {pastro} for {graceid}")
gdb.submit_pastro(float(pastro), graceid, event.event_dir)
event_dir = outdir / "events" / event.event_dir
posterior_file = event_dir / "amplfi.posterior_samples.hdf5"

retries = 0
while True:
try:
with h5py.File(posterior_file, "r") as f:
samples = f["posterior_samples"][:]
m1_source = samples["mass_1_source"]
m2_source = samples["mass_2_source"]

logger.info("Read posteriors from file")
num_samples = len(m1_source)
bns_frac = sum(m1_source < 3) / num_samples
bbh_frac = sum(m2_source > 3) / num_samples
nsbh_frac = 1 - bns_frac - bbh_frac

probs = {
"BBH": pastro * bbh_frac,
"NSBH": pastro * nsbh_frac,
"BNS": pastro * bns_frac,
"Terrestrial": 1 - pastro,
}

break
except Exception:
time.sleep(TIMESTEP)
retries += 1

if retries >= MAX_RETRIES:
logging.info(
f"Posterior file not found after {TIMEOUT} seconds, "
"assigning all probability to BBH"
)
probs = {
"BBH": pastro,
"NSBH": 0,
"BNS": 0,
"Terrestrial": 1 - pastro,
}
break

logger.info(f"Submitting p_astro: {probs} for {graceid}")
gdb.submit_pastro(probs, graceid, event.event_dir)
logger.info(f"Submitted p_astro for {graceid}")
33 changes: 10 additions & 23 deletions projects/online/online/utils/gdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import h5py
from gwpy.time import tconvert
from ligo.gracedb.rest import GraceDb as _GraceDb
from ligo.em_bright import em_bright
from ligo.skymap.tool.ligo_skymap_plot import main as ligo_skymap_plot
from ligo.skymap.io.fits import write_sky_map
from online.utils.searcher import Event
Expand Down Expand Up @@ -207,19 +206,9 @@ def submit_low_latency_pe(
posterior_samples = posterior_df.to_records(index=False)
with h5py.File(filename, "w") as f:
f.create_dataset("posterior_samples", data=posterior_samples)

_, has_ns, _, _ = em_bright.source_classification_pe(
filename, num_eos_draws=10
)
if has_ns > 0:
self.logger.info(
f"Event {graceid} had HasNS = {has_ns}, so {filename} "
" was not uploaded."
)
else:
self.write_log(
graceid, "posterior", filename=filename, tag_name="pe"
)
self.logger.debug("Submitting posterior samples to GraceDB")
self.write_log(graceid, "posterior", filename=filename, tag_name="pe")
self.logger.debug("Posterior samples submitted")

# update event with source parameters
self.update_event(event, graceid, result)
Expand Down Expand Up @@ -339,18 +328,13 @@ def submit_skymap_plots(self, graceid: str, event_dir: Path):
# tag_name="sky_loc",
# )

def submit_pastro(self, pastro: float, graceid: str, event_dir: Path):
def submit_pastro(
self, probs: dict[str, float], graceid: str, event_dir: Path
):
event_dir = self.write_dir / event_dir
fname = event_dir / "aframe.p_astro.json"
pastro = {
"BBH": pastro,
"Terrestrial": 1 - pastro,
"NSBH": 0,
"BNS": 0,
}

with open(fname, "w") as f:
json.dump(pastro, f)
json.dump(probs, f)

self.write_log(
graceid,
Expand All @@ -371,3 +355,6 @@ def create_event(self, filename: str, **_):

def write_log(self, *args, **kwargs):
pass

def replace_event(self, *args, **kwargs):
pass
2 changes: 2 additions & 0 deletions projects/online/online/utils/pe.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ def postprocess_samples(
)
posterior["mass_1"] = mass_1
posterior["mass_2"] = mass_2
posterior["mass_1_source"] = mass_1 / (1 + z_vals)
posterior["mass_2_source"] = mass_2 / (1 + z_vals)
# add time column so ligo-skymap-from-samples
# can add the gpstime metadata attribute
posterior["time"] = np.ones_like(mass_1) * event_time
Expand Down
1 change: 0 additions & 1 deletion projects/online/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies = [
"matplotlib==3.9.4",
"ligo-skymap>=2.4.0,<3",
"ligo-gracedb>=2.14.1",
"ligo-em-bright>=1.2.2",
"tables>=3.9",
]

Expand Down
61 changes: 0 additions & 61 deletions projects/online/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.