Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docker-compose-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ services:
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-fabric}
- PGDATA=${PGDATA:-/var/lib/postgresql/data}
neo4j1:
image: fabrictestbed/neo4j-apoc:5.3.0
#image: fabrictestbed/neo4j-apoc:5.3.0
image: kthare10/neo4j-apoc:5.3.0
container_name: neo4j1
user: ${NEO4J_UID:-1000}:${NEO4J_GID:-1000}
restart: always
Expand Down
5 changes: 3 additions & 2 deletions fabric_cf/actor/core/policy/broker_simpler_units_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver:
interface_type=InterfaceType.TrunkPort)

if net_cp is None:
error_msg = "Peer Connection Point not found from Network AM"
error_msg = f"Peer Connection Point not found from Network AM: {site_cp.node_id}"
raise BrokerException(msg=error_msg)

self.logger.debug(f"Peer Interface Sliver [Network Delegation] (A): {net_cp}")
Expand Down Expand Up @@ -2295,7 +2295,8 @@ def get_peer_interface_sliver(self, *, site_ifs_id: str, interface_type: Interfa
"""
try:
self.lock.acquire()
peer_interfaces = FimHelper.get_peer_interfaces(ifs_node_id=site_ifs_id, graph=self.combined_broker_model,
peer_interfaces = FimHelper.get_peer_interfaces(ifs_node_id=site_ifs_id,
graph=self.combined_broker_model,
interface_type=interface_type)

if len(peer_interfaces) == 0:
Expand Down
395 changes: 314 additions & 81 deletions fabric_cf/actor/fim/fim_helper.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions fabric_cf/actor/fim/plugins/broker/aggregate_bqm_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def plug_produce_bqm(self, *, cbm: ABCCBMPropertyGraph, **kwargs) -> ABCBQMPrope
p4_sliver.capacities = Capacities()
p4_sliver.capacity_allocations = Capacities()
p4_sliver.capacities += p4.get_capacities()
if not self.DEBUG_FLAG:
if not self.DEBUG_FLAG and kwargs['query_level'] != 0:
# query database for everything taken on this node
allocated_caps, allocated_comp_caps = self.occupied_node_capacity(db=db, node_id=p4.node_id,
start=start, end=end)
Expand Down Expand Up @@ -529,7 +529,7 @@ def plug_produce_bqm(self, *, cbm: ABCCBMPropertyGraph, **kwargs) -> ABCBQMPrope
ABCPropertyGraph.PROP_CAPACITY_ALLOCATIONS]
elif cbm_link_props.get(ABCPropertyGraph.PROP_CAPACITIES):
new_link_props[ABCPropertyGraph.PROP_CAPACITIES] = cbm_link_props[ABCPropertyGraph.PROP_CAPACITIES]
if not self.DEBUG_FLAG:
if not self.DEBUG_FLAG and kwargs['query_level'] != 0:
occupied_link_capacity = self.occupied_link_capacity(node_id=link, db=db, start=start, end=end)
if occupied_link_capacity:
new_link_props[ABCPropertyGraph.PROP_CAPACITY_ALLOCATIONS] = occupied_link_capacity
Expand Down Expand Up @@ -654,7 +654,7 @@ def plug_produce_bqm(self, *, cbm: ABCCBMPropertyGraph, **kwargs) -> ABCBQMPrope
elif fac_link_props.get(ABCPropertyGraph.PROP_CAPACITIES):
new_link_props[ABCPropertyGraph.PROP_CAPACITIES] = fac_link_props[
ABCPropertyGraph.PROP_CAPACITIES]
if not self.DEBUG_FLAG:
if not self.DEBUG_FLAG and kwargs['query_level'] != 0:
occupied_link_capacity = self.occupied_link_capacity(db=db, node_id=fac_link_id,
start=start, end=end)
if occupied_link_capacity:
Expand Down
7 changes: 6 additions & 1 deletion fabric_cf/orchestrator/core/bqm_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Author: Komal Thareja (kthare10@renci.org)
from datetime import datetime, timezone

from fabric_cf.actor.core.common.constants import Constants
from fabric_cf.actor.fim.fim_helper import FimHelper
from fim.user import GraphFormat

Expand All @@ -37,7 +38,11 @@ def __init__(self):
self.graph_format = None
self.bqm = None
self.last_query_time = None
self.refresh_interval_in_seconds = 60
from fabric_cf.actor.core.container.globals import GlobalsSingleton
config = GlobalsSingleton.get().get_config()
bqm_config = config.get_global_config().get_bqm_config()
refresh_interval = bqm_config.get(Constants.REFRESH_INTERVAL, 2000)
self.refresh_interval_in_seconds = refresh_interval
self.refresh_in_progress = False
self.level = 1
self.graph_id = None
Expand Down
16 changes: 10 additions & 6 deletions fabric_cf/orchestrator/core/orchestrator_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def discover_broker_query_model(self, *, controller: ABCMgmtControllerMixin, tok
broker_query_model = model.get_model()

# Do not update cache for advance requests
if not start and not end and not includes and not excludes:
if not start and not end and not includes and not excludes and level > 0:
self.controller_state.save_bqm(bqm=broker_query_model, graph_format=graph_format, level=level)

return broker_query_model
Expand Down Expand Up @@ -326,7 +326,6 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
lease_start_time=lease_start_time,
lease_end_time=lease_end_time,
lifetime=lifetime)
new_slice_object.update_topology(topology=topology)

# Check if Testbed in Maintenance or Site in Maintenance
self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations)
Expand All @@ -338,6 +337,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
# Enqueue future slices on Advanced Scheduling Thread to determine possible start time
# Determining start time may take time so this is done asynchronously to avoid increasing response time
# of create slice API
new_slice_object.update_topology(topology=topology)
self.controller_state.get_advance_scheduling_thread().queue_slice(controller_slice=new_slice_object)
else:
# Enqueue the slice on the demand thread
Expand All @@ -346,6 +346,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key

# Add Reservations to relational database;
new_slice_object.add_reservations()
new_slice_object.update_topology(topology=topology)
self.logger.info(f"OC wrapper: TIME= {time.time() - create_ts:.0f}")
self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object)
self.logger.info(f"QU queue: TIME= {time.time() - create_ts:.0f}")
Expand Down Expand Up @@ -511,7 +512,6 @@ def modify_slice(self, *, token: str, slice_id: str, slice_graph: str) -> List[d

# Compute the reservations
topology_diff, computed_reservations = slice_object.modify(new_slice_graph=asm_graph)
slice_object.update_topology(topology=topology)

# Check if Test Bed or site is in maintenance
self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations)
Expand All @@ -521,10 +521,14 @@ def modify_slice(self, *, token: str, slice_id: str, slice_graph: str) -> List[d

# Slice has sliver modifications - add/remove/update for slivers requiring AM updates
modify_state = slice_object.has_sliver_updates_at_authority()
FimHelper.delete_graph(graph_id=slice_obj.get_graph_id())
graph_id = asm_graph.get_graph_id()
meta_data_updates = slice_object.has_meta_data_updates(topology_diff=topology_diff)

if topology_diff is not None and (modify_state or meta_data_updates):
slice_object.update_topology(topology=topology)
FimHelper.delete_graph(graph_id=slice_obj.get_graph_id())
graph_id = asm_graph.get_graph_id()
slice_obj.graph_id = graph_id

slice_obj.graph_id = graph_id
config_props = slice_obj.get_config_properties()
config_props[Constants.PROJECT_ID] = project
config_props[Constants.TAGS] = ','.join(tags)
Expand Down
14 changes: 5 additions & 9 deletions fabric_cf/orchestrator/core/orchestrator_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_saved_bqm(self, *, graph_format: GraphFormat, level: int) -> BqmWrapper:
try:
self.lock.acquire()
key = f"{graph_format}-{level}"
saved_bqm = self.bqm_cache.get(key, None)
saved_bqm = self.bqm_cache.get(key)
return saved_bqm
finally:
self.lock.release()
Expand All @@ -97,18 +97,14 @@ def save_bqm(self, *, bqm: str, graph_format: GraphFormat, level: int):
try:
self.lock.acquire()
key = f"{graph_format}-{level}"
saved_bqm = self.bqm_cache.get(key, None)
saved_bqm = self.bqm_cache.get(key)
if saved_bqm is None:
from fabric_cf.actor.core.container.globals import GlobalsSingleton
refresh_interval = GlobalsSingleton.get().get_config().get_global_config().get_bqm_config().get(
Constants.REFRESH_INTERVAL, None)
saved_bqm = BqmWrapper()
saved_bqm.set_refresh_interval(refresh_interval=int(refresh_interval))
saved_bqm.save(bqm=bqm, graph_format=graph_format, level=level)
self.bqm_cache[key] = saved_bqm

if level == 0:
self.load_model(model=bqm)
#if level == 0:
# self.load_model(model=bqm)
finally:
self.lock.release()

Expand Down Expand Up @@ -199,7 +195,7 @@ def start_threads(self):
force_refresh=True, level=0)
self.load_model(model=model)

self.get_logger().debug("Starting SliceDeferThread")
self.get_logger().info("Starting SliceDeferThread")
self.defer_thread = SliceDeferThread(kernel=self)
self.defer_thread.start()
self.event_processor = EventProcessor(name="PeriodicProcessor", logger=self.logger)
Expand Down
34 changes: 32 additions & 2 deletions fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from fim.slivers.network_node import NodeSliver, NodeType
from fim.slivers.network_service import NetworkServiceSliver
from fim.slivers.topology_diff import WhatsModifiedFlag, TopologyDiff
from fim.user import ServiceType, ExperimentTopology, InterfaceType
from fim.user import ServiceType, ExperimentTopology, InterfaceType, ReservationInfo

from fabric_cf.actor.core.common.constants import ErrorCodes, Constants
from fabric_cf.actor.core.kernel.reservation_states import ReservationPendingStates, ReservationStates
Expand Down Expand Up @@ -148,7 +148,12 @@ def add_reservations(self):
start = time.time()
# Add Network Node reservations
for r in self.computed_add_reservations:
self.controller.add_reservation(reservation=r)
res_id = self.controller.add_reservation(reservation=r)
sliver = r.get_sliver()
sliver.reservation_info = ReservationInfo()
sliver.reservation_info.reservation_id = str(res_id)
sliver.reservation_info.reservation_state = str(ReservationStates.Nascent)

self.logger.info(f"ADD TIME: {time.time() - start:.0f}")

def create(self, *, slice_graph: ABCASMPropertyGraph, lease_start_time: datetime = None,
Expand Down Expand Up @@ -775,3 +780,28 @@ def has_topology_diffs(self, *, topology_diff: TopologyDiff) -> bool:

self.logger.debug(f"Topology diff found: {ret_val}")
return ret_val

def has_meta_data_updates(self, *, topology_diff: TopologyDiff) -> bool:
"""
Check if there are any User Data updates in the given topology difference.

:param topology_diff: TopologyDiff object containing modifications.
:return: True if any node, component, interface, or service has USER_DATA updated.
"""
if not topology_diff:
return False

modified = topology_diff.modified

for collection in (
modified.nodes,
modified.components,
modified.interfaces,
modified.services,
):
if any(flag & WhatsModifiedFlag.USER_DATA for _, flag in collection):
self.logger.debug("Topology diff found with User Data Update: True")
return True

self.logger.debug("Topology diff found with User Data Update: False")
return False
4 changes: 1 addition & 3 deletions fabric_cf/orchestrator/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- frontend
- backend
ports:
- 0.0.0.0:443:443
- 443:443
volumes:
- ./nginx/default.conf:/etc/nginx/conf.d/default.conf
- ./certs/fullchain.pem:/etc/ssl/public.pem
Expand All @@ -21,7 +21,6 @@ services:
restart: always
user: ${NEO4J_UID:-1000}:${NEO4J_GID:-1000}
networks:
- frontend
- backend
volumes:
- ${NEO4J_DATA_PATH_HOST:-$(pwd)/neo4j/data}:${NEO4J_DATA_PATH_DOCKER:-/data}
Expand Down Expand Up @@ -75,7 +74,6 @@ services:
- database
- neo4j
networks:
- frontend
- backend
volumes:
- ./neo4j:/usr/src/app/neo4j
Expand Down
14 changes: 14 additions & 0 deletions tools/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ def export(self):
site_name = None
host_name = None
ip_subnet = None
ip_v4 = None
ip_v6 = None
core = None
ram = None
disk = None
Expand All @@ -178,6 +180,16 @@ def export(self):
site_name = sliver.get_site()
if sliver.get_gateway():
ip_subnet = str(sliver.get_gateway().subnet)
if sliver.labels and sliver.labels.ipv4:
if isinstance(sliver.labels.ipv4, list):
ip_v4 = str(sliver.labels.ipv4[0])
else:
ip_v4 = str(sliver.labels.ipv4)
if sliver.labels and sliver.labels.ipv6:
if isinstance(sliver.labels.ipv4, list):
ip_v6 = str(sliver.labels.ipv6[0])
else:
ip_v6 = str(sliver.labels.ipv6)
if sliver.capacities:
bw = sliver.capacities.bw

Expand All @@ -195,6 +207,8 @@ def export(self):
"state": reservation.get_state().name.lower(),
"sliver_type": str(reservation.get_type()).lower(),
"ip_subnet": ip_subnet,
"ip_v4": ip_v4,
"ip_v6": ip_v6,
"error": error_message,
"image": image,
"core": core,
Expand Down