Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 268 additions & 0 deletions analysator/calculations/fieldtracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,271 @@ def static_field_tracer_3d( vlsvReader, seed_coords, max_iterations, dx, directi

return points_traced # list for fg; 3d numpy array(N,maxiterations,3) for vg


class streaklines:
"""
Class to analyze temporal pathlines and streaklines between a range of Vlasiator time steps

Use:
seed = [1e8, 0, 0]
files_list = ["./bulk1.0001300.vlsv","./bulk1.00013005.vlsv"]
streakline_object = pt.calculations.streaklines(files_list = files_list, seed_points = seed)
#extract pathline
pathline = streakline_object.get_pathline(seed_idx = 0, time = 1303)
"""

def __init__(self, vlsvTObject = None, files_list = None, seed_points = None, direction="+", points_per = 10, dt_step = 0.1, var = "vg_v", method = "euler"):

if files_list is not None:
#adding as readers since VlsvTInterpolator has no caching
readers = [pt.vlsvfile.VlsvReader(f, indexer = "dict") for f in files_list]
vlsvTObject = pt.calculations.VlsvTInterpolator(vlsvReaders_list = readers)

self.M = self.streaklines_3D(vlsvTObject=vlsvTObject,
seed_points=seed_points, direction=direction,points_per=points_per,
dt_step = dt_step,var = var, method = method)

self.time_range = [min(vlsvTObject.ts), max(vlsvTObject.ts)]

pass


def streaklines_3D(self, vlsvTObject = None, files_list = None, seed_points = None, direction="+", dt_step=0.1, max_dx = 1e5, method = "euler", points_per = 10, var = "vg_v"):
"""
Streaklines 3D() integrates along dynamic field-grid vector field to calculate a final position.
Code uses Euler or RK4 method to conduct the tracing.
Each seed gives a (i,j,3) matrix where the i-axis is the coordinate time, and j-axis is the release time of the particles
i=j gives the seed point/release point where particles are started to track. For consistent logic keep integrations steps divisible by points_per
algorithm forward:
-open file i
-place particle in i=j
-record all particle positions
-move all particles j<=i according to velocity at time i
-current_pos += velocity*dt
loop till end
keep track of current position using current_pos = (N,T*P,3)

param vlsvTObject: A callable object that can be used to interpolate values in time and space from the given files
param files_list: List containing paths to VLSV files
param seed_points: list of seed points for streaklines (meters)
param direction: direction of integration (+,-, both)
param dt_step: time step size of integration
param var: integration variable
param method: integration step method (euler, RK4)
param points_per number of points recorded between time steps

returns: (N, T*P+1-P, T*P+1-P, 3) matrix where N: number of seed points, T: number of timesteps, P: points per timestep, and 3D coordinates
"""

import math

#Initial input checks
if (vlsvTObject is None) and (files_list is None):
raise ValueError("Provide either a VlsvTInterpolator object or a list of vlsv file paths")
elif (vlsvTObject is not None) and (files_list is not None):
raise ValueError("Please only provide one source of files")
if (seed_points is None):
raise ValueError("Please provide seed points")


if files_list is not None:
#adding as readers since VlsvTInterpolator has no caching
readers = [pt.vlsvfile.VlsvReader(f, indexer = "dict") for f in files_list]
vlsvTObject = pt.calculations.VlsvTInterpolator(vlsvReaders_list = readers)
#vlsvTObject = pt.calculations.VlsvTInterpolator(files_list)

#number of time steps
T = len(vlsvTObject.files)
#number of seed points
N = len(seed_points)
#effective number of time steps due to inbetween recordings
T_eff = T*points_per-points_per+1

#helper functions
def active_indices(idx):
if direction =="+":
return range(0, idx+1)
return range(idx, T_eff)

def inject_particles(idx):
for n in range(N):
current_pos[n,idx, :] = seed_points[n]

def record_particles(i_idx, js):
for n in range(N):
for j in js:
M[n, i_idx, j, :] = current_pos[n, j, :]

def _get_var(t, positions):
return vlsvTObject(t, positions, var)

#function trivialize change of integration method
def integration_step(t_step, dt_s, n_idx, j_idx, method = method):
#dp is the displacement amount in a integration step
"""
pram t_step: time step is taken
pram dt_s: size of the integration step
pram n_idx: seed point index array
pram j_idx: release time index array

"""

positions = current_pos[n_idx, j_idx]

if method == "euler":
#classic euler step method of integration
dx = _get_var(t_step, positions)
elif method == "RK4":
#runge-kutta 4 integration method, see: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
k1 = _get_var(t_step, positions)
k2 = _get_var(t_step + dt_s/2, positions + sign*k1*dt_s/2)
k3 = _get_var(t_step + dt_s/2, positions + sign*k2*dt_s/2)
k4 = _get_var(t_step + dt_s, positions + sign*dt_s*k3)
dx = 1/6*(k1 + 2*k2 + 2*k3 + k4)
else:
raise ValueError("not a valid integrations type")

#update current particle positions according to the step
current_pos[n_idx, j_idx, :] += sign*dt_s*dx


if direction == "both":

M_plus = self.streaklines_3D(vlsvTObject = vlsvTObject, seed_points = seed_points, direction = "+", points_per = points_per, var = var, method=method, dt_step= dt_step)
M_minus = self.streaklines_3D(vlsvTObject = vlsvTObject, seed_points = seed_points, direction = "-", points_per = points_per, var = var,method=method, dt_step = dt_step)
M_both = np.nansum(np.stack([M_plus, M_minus], axis = 0), axis = 0)

#fix diagonal
for t in range(T_eff):
M_both[:,t,t,:] = M_plus[:,t,t,:]
return M_both

#Full matrix to be filled with coordinates
M = np.full((N, T_eff, T_eff, 3), np.nan)
#records the current positions of the tracked plasma
current_pos = np.full((N, T_eff, 3), np.nan)

#sign keeps track of integration direction
sign = 1 if direction == "+" else -1

file_indices = list(range(T)) if direction == "+" else list(range(T - 1, -1, -1))

#MAIN LOOP
for i, file_index in enumerate(file_indices):

eff_file_index = file_index*points_per

if i == 0:
#initial file read and timestep
t_0 = vlsvTObject.ts[file_index]

#INJECTING ON DIAGONAL POINTS AT TIMESTEPS
inject_particles(eff_file_index)
js = np.array(list(active_indices(eff_file_index)))
record_particles(eff_file_index,js)

if i == T-1:
#On last step return the matrix and dont continue the integration
return M #matrix of form shape = (N, T, T, 3)

#INTEGRATION

#read next vlsvfile for time
next_file = file_index + 1 if direction == "+" else file_index - 1
t_1 = vlsvTObject.ts[next_file]

#time between files
dt = abs(t_1-t_0)

#indexing
ns = np.arange(N)
n_idx, j_idx = np.meshgrid(ns,js,indexing = "ij")
n_idx = n_idx.ravel()
j_idx = j_idx.ravel()

"""
How integration step size is determined
bulk_n dt bulk_n+1
|----------------------------|
dt_step dt_step dt_step (dt-t_elapsed)
|-------|-------|-------|----|
"""
#number of integration steps
steps = math.ceil(dt/dt_step)
t_elapsed = 0.0
prev_record_idx = 0

#integration loop
for step in range(steps):

#t_0: current file time, t_elapsed*sign: elapsed time toward next file
t_step = t_0 + t_elapsed*sign

#determining dt_step size
actual_dt = min(dt_step, dt-t_elapsed)

#integration step
integration_step(t_step, actual_dt, n_idx, j_idx)

#keeping track of the elapsed time
t_elapsed += actual_dt

#this leaves nans if points_per too big and dt_step is too big relative to dt
record_idx = int((t_elapsed)/dt*points_per)

#checking when to record particles
if record_idx != prev_record_idx and record_idx < points_per:

prev_record_idx = record_idx

eff_idx = eff_file_index + record_idx*sign

#adding seed points between timesteps
inject_particles(eff_idx)
js = np.array(list(active_indices(eff_idx)))
record_particles(eff_idx, js)

#preps indeces for next loop
n_idx, j_idx = np.meshgrid(ns,js, indexing="ij")
n_idx = n_idx.ravel()
j_idx = j_idx.ravel()

#set next points as current points for the next looping
t_0 = t_1

#Back up return M

return M #matrix of form shape = (N, T_eff, T_eff, 3)

def _time_to_idx(self, time):
"""
From know dt, t range, and points_per can determine
for a given time the index in M
"""
if (time < self.time_range[0]) or (time> self.time_range[1]):
raise ValueError(f"Time not with the analyzed time range of {self.time_range[0]}-{self.time_range[1]}")
T_eff = self.M.shape[1]
frac = (time-self.time_range[0])/(self.time_range[1]-self.time_range[0])
idx = int(round(frac*(T_eff-1)))
return idx

def get_pathline(self, seed_idx, time):
"""
Get the pathline of a seed point released at a given time
time input --> return closest pathline
"""
release_idx = self._time_to_idx(time=time)
pathline = self.M[seed_idx, :, release_idx, :]

return pathline

def get_streakline(self, seed_idx, time):
"""
Get the streakline between the start time and a given time

time input --> return closest streakline
"""
time_idx = self._time_to_idx(time=time)
streakline = self.M[seed_idx, time_idx, :, : ]

return streakline