Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aframe/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def sandbox_env(self, _):
# map in local tmpdir (should be /local/albert.einstein)
# which has is enough memory to write large temp
# files with luigi/law
env["TMPDIR"] = f"/local/{os.getenv('USER')}"
env["TMPDIR"] = os.getenv("AFRAME_TMPDIR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should mention this variable in the README along with the others that you suggest putting into a .env


# if gpus are specified, expose them inside container
# via CUDA_VISIBLE_DEVICES env variable
Expand Down
5 changes: 3 additions & 2 deletions aframe/pipelines/sandbox/sandbox.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ reference_frequency = &::luigi_base::reference_frequency
waveform_approximant = &::luigi_base::waveform_approximant
prior = &::luigi_base::prior


[luigi_ValidationWaveforms]
workflow = htcondor
num_jobs = 20
num_signals = 1000
num_jobs = 200
num_signals = 20000
ifos = &::luigi_base::ifos
highpass = &::luigi_base::highpass
sample_rate = &::luigi_base::sample_rate
Expand Down
1 change: 1 addition & 0 deletions aframe/pipelines/seed_variance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .seed import SeedVariability
167 changes: 167 additions & 0 deletions aframe/pipelines/seed_variance/seed.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# luigi level config
[luigi_core]
local_scheduler = true
module = aframe
log_level = INFO

# configuration for pipeline parameters

[luigi_base]
ifos = ["H1", "L1"]

# data generation parameters
train_start = 1240579783
train_stop = 1241443783
test_stop = 1244035783
max_duration = 20000
Tb = 3153600
flag = DATA
channels = ["H1", "L1"]
shifts = [0, 1]
seed = 1122

streams_per_gpu = 2

# waveform parameters
waveform_approximant = IMRPhenomPv2
waveform_duration = 8
minimum_frequency = 20
reference_frequency = 50

# training parameters
kernel_length = 1.5
batch_size = 512
prior = priors.priors.end_o3_ratesandpops

# data conditioning / preprocessing parameters
sample_rate = 2048
fduration = 1
highpass = 32

# inference / export parameters
inference_psd_length = 64
inference_sampling_rate = 4
inference_batch_size = 128

[luigi_FetchTrain]
workflow = htcondor
start = &::luigi_base::train_start
end = &::luigi_base::train_stop
sample_rate = &::luigi_base::sample_rate
min_duration = 1024
max_duration = &::luigi_base::max_duration
flag = &::luigi_base::flag
ifos = &::luigi_base::ifos
channels = &::luigi_base::channels
request_memory = 32678

[luigi_TrainingWaveforms]
num_signals = 100000
sample_rate = &::luigi_base::sample_rate
waveform_duration = &::luigi_base::waveform_duration
minimum_frequency = &::luigi_base::minimum_frequency
reference_frequency = &::luigi_base::reference_frequency
waveform_approximant = &::luigi_base::waveform_approximant
prior = &::luigi_base::prior

[luigi_FetchTest]
workflow = htcondor
start = &::luigi_base::train_stop
end = &::luigi_base::test_stop
sample_rate = &::luigi_base::sample_rate
min_duration = 128
max_duration = &::luigi_base::max_duration
flag = &::luigi_base::flag
channels = &::luigi_base::channels
request_memory = 32678


[luigi_TimeslideWaveforms]
workflow = htcondor
seed = 112296
Tb = &::luigi_base::Tb
shifts = &::luigi_base::shifts
spacing = 16
buffer = 16
snr_threshold = 4
prior = &::luigi_base::prior
start = &::luigi_base::train_stop
end = &::luigi_base::test_stop
ifos = &::luigi_base::ifos
psd_length = &::luigi_base::inference_psd_length
sample_rate = &::luigi_base::sample_rate
minimum_frequency = &::luigi_base::minimum_frequency
reference_frequency = &::luigi_base::reference_frequency
waveform_duration = &::luigi_base::waveform_duration
waveform_approximant = &::luigi_base::waveform_approximant
highpass = &::luigi_base::highpass

[luigi_ValidationWaveforms]
workflow = htcondor
num_jobs = 200
num_signals = 20000
ifos = &::luigi_base::ifos
highpass = &::luigi_base::highpass
sample_rate = &::luigi_base::sample_rate
waveform_duration = &::luigi_base::waveform_duration
minimum_frequency = &::luigi_base::minimum_frequency
reference_frequency = &::luigi_base::reference_frequency
waveform_approximant = &::luigi_base::waveform_approximant
prior = &::luigi_base::prior
snr_threshold = 4

[luigi_SeedVarExport]
fduration = &::luigi_base::fduration
kernel_length = &::luigi_base::kernel_length
inference_sampling_rate = &::luigi_base::inference_sampling_rate
sample_rate = &::luigi_base::sample_rate

# TODO: resolve enum platform parsing error
# platform = luigi.Parameter(default="TENSORRT")
ifos = &::luigi_base::ifos
batch_size = &::luigi_base::inference_batch_size
psd_length = &::luigi_base::inference_psd_length
highpass = &::luigi_base::highpass

streams_per_gpu = &::luigi_base::streams_per_gpu
aframe_instances = 1
preproc_instances = 1
clean = true

[luigi_Train]
config = /home/ethan.marx/projects/aframev2/projects/train/config.yaml
train_remote = false
ifos = &::luigi_base::ifos
kernel_length = &::luigi_base::kernel_length
highpass = &::luigi_base::highpass
fduration = &::luigi_base::fduration
seed = &::luigi_base::seed
use_wandb = true
request_gpus = 2

[luigi_SeedVarInfer]
fduration = &::luigi_base::fduration
batch_size = &::luigi_base::inference_batch_size
psd_length = &::luigi_base::inference_psd_length
ifos = &::luigi_base::ifos
inference_sampling_rate = &::luigi_base::inference_sampling_rate
cluster_window_length = 8
integration_window_length = 1
Tb = &::luigi_base::Tb
shifts = &::luigi_base::shifts
streams_per_gpu = &::luigi_base::streams_per_gpu
rate_per_gpu = 70

# triton args
model_name = aframe-stream
model_version = -1
triton_image = hermes/tritonserver:22.12
sequence_id = 1001


[logging]
law: INFO
law.sandbox.base: DEBUG
law.patches: INFO
luigi-interface: INFO
law.workflow.base: DEBUG
85 changes: 85 additions & 0 deletions aframe/pipelines/seed_variance/seed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Train multiple models with different seeds
for comparing performance variability
"""
import os

import luigi
import numpy as np

from aframe.base import AframeWrapperTask
from aframe.pipelines.config import paths
from aframe.tasks import ExportLocal, TimeslideWaveforms, Train
from aframe.tasks.infer import InferLocal
from aframe.tasks.plots.sv import SensitiveVolume


class SeedVarExport(ExportLocal):
train_seed = luigi.IntParameter()

def requires(self):
return Train.req(
self,
data_dir=paths().train_datadir,
run_dir=os.path.join(paths().train_rundir, str(self.train_seed)),
seed=self.train_seed,
train_remote=True,
)


class SeedVarInfer(InferLocal):
train_seed = luigi.IntParameter()

def requires(self):
reqs = {}
reqs["model_repository"] = SeedVarExport.req(
self,
repository_directory=os.path.join(
paths().results_dir, "model_repo"
),
train_seed=self.train_seed,
)
ts_waveforms = TimeslideWaveforms.req(
self,
output_dir=paths().test_datadir,
)
fetch = ts_waveforms.requires().workflow_requires()["test_segments"]

reqs["data"] = fetch
reqs["waveforms"] = ts_waveforms
return reqs


class SeedVarSV(SensitiveVolume):
train_seed = luigi.IntParameter()

def requires(self):
reqs = {}
reqs["ts"] = TimeslideWaveforms.req(
self, output_dir=paths().test_datadir
)

reqs["infer"] = SeedVarInfer.req(
self,
output_dir=os.path.join(paths().results_dir, "infer"),
train_seed=self.train_seed,
)
return reqs


class SeedVariability(AframeWrapperTask):
num_seeds = luigi.IntParameter(
default=2,
description="Number of training jobs with unique seeds to launch",
)

def requires(self):
seeds = np.random.randint(0, 1e5, size=self.num_seeds)
for seed in seeds:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the different seeds are done sequentially? Or does that scheduling happen sequentially and all the training/inference happens in parallel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So they'll still get scheduled in "parallel" , which is actually sort of problematic at inference time when they'll be fighting for gpus.

This is mostly why i'm holding off on merging this. Should find a cleaner solution.

yield SeedVarSV.req(
self,
train_seed=seed,
output_dir=os.path.join(
paths().results_dir, str(seed), "plots"
),
)
11 changes: 8 additions & 3 deletions aframe/tasks/data/condor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ class LDGCondorWorkflow(htcondor.HTCondorWorkflow):
condor_directory = luigi.Parameter()
accounting_group_user = luigi.Parameter(default=os.getenv("LIGO_USERNAME"))
accounting_group = luigi.Parameter(default=os.getenv("LIGO_GROUP"))
request_disk = luigi.Parameter(default="1024")
request_memory = luigi.Parameter(default="32678")
request_disk = luigi.Parameter(default="1024K")
request_memory = luigi.Parameter(default="4096M")
request_cpus = luigi.IntParameter(default=1)

# don't pass computing requirements between tasks
# since different tasks will often have different requirements
exclude_params_req = {"request_memory", "request_disk", "request_cpus"}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.htcondor_log_dir.touch()
Expand Down Expand Up @@ -50,7 +54,7 @@ def htcondor_log_dir(self):

@property
def job_file_dir(self):
return self.htcondor_output_directory().child("jobs", type="d").path
return os.path.join(self.condor_directory, "jobs")

@property
def law_config(self):
Expand All @@ -73,6 +77,7 @@ def build_environment(self):
environment += f'PATH={os.getenv("PATH")} '
environment += f"LAW_CONFIG_FILE={self.law_config} "
environment += f"USER={os.getenv('USER')} "
environment += f"TMPDIR={os.getenv('TMPDIR')} "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be AFRAME_TMPDIR?


# forward any env variables that start with AFRAME_
# that the law config may need to parse
Expand Down
4 changes: 3 additions & 1 deletion aframe/tasks/data/timeslide_waveforms/timeslide_waveforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def output(self):
]

def requires(self):
return DeployTimeslideWaveforms.req(self)
return DeployTimeslideWaveforms.req(
self, condor_directory=self.condor_directory
)

@property
def targets(self):
Expand Down
26 changes: 15 additions & 11 deletions aframe/tasks/data/waveforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class DeployValidationWaveforms(
Generate waveforms for validation via rejection sampling
"""

output_dir = luigi.Parameter(
description="Directory where validation waveforms will be saved"
tmp_dir = luigi.Parameter(
description="Directory where temporary validation "
"waveforms will be saved before being merged",
default=os.getenv("AFRAME_TMPDIR", "/tmp/aframe/"),
)
output_dir = luigi.Parameter()
ifos = luigi.ListParameter(
description="Interferometers for which waveforms will be generated"
)
Expand All @@ -126,12 +129,12 @@ def workflow_requires(self):
return reqs

@property
def tmp_dir(self):
return os.path.join(self.output_dir, f"tmp-{self.branch}")
def branch_tmp_dir(self):
return os.path.join(self.tmp_dir, f"tmp-{self.branch}")

def output(self):
return law.LocalFileTarget(
os.path.join(self.tmp_dir, "val_waveforms.hdf5")
os.path.join(self.branch_tmp_dir, "val_waveforms.hdf5")
)

def create_branch_map(self):
Expand Down Expand Up @@ -203,7 +206,7 @@ class ValidationWaveforms(AframeDataTask):
"""

output_dir = luigi.Parameter(
description="Directory where validation waveforms will be saved"
description="Directory where merged validation waveforms will be saved"
)
condor_directory = luigi.Parameter(
default=os.path.join(
Expand All @@ -217,10 +220,12 @@ def __init__(self, *args, **kwargs):
self.output_file = os.path.join(self.output_dir, "val_waveforms.hdf5")

def output(self):
return (s3_or_local(self.output_file),)
return s3_or_local(self.output_file)

def requires(self):
return DeployValidationWaveforms.req(self)
return DeployValidationWaveforms.req(
self,
)

@property
def targets(self):
Expand All @@ -233,9 +238,8 @@ def waveform_files(self):
def run(self):
from ledger.injections import LigoWaveformSet

LigoWaveformSet.aggregate(
self.waveform_files, self.output_file, clean=True
)
with self.output().open("w") as f:
LigoWaveformSet.aggregate(self.waveform_files, f, clean=True)

# clean up temporary directories
for dirname in glob.glob(os.path.join(self.output_dir, "tmp-*")):
Expand Down