diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 3483135c..607d0882 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/miniconda:0-3 # commenting out mamba install, is given an error, see: # https://github.com/conda/conda-libmamba-solver/issues/540 -RUN conda install -n base -c conda-forge mamba +RUN conda install -n base -c conda-forge # Copy environment.yml (if found) to a temp location so we update the environment. Also # copy "noop.txt" so the COPY instruction does not fail if no environment.yml exists. diff --git a/hsds/headnode.py b/hsds/headnode.py index 9b49517d..55547294 100755 --- a/hsds/headnode.py +++ b/hsds/headnode.py @@ -103,6 +103,8 @@ def is_healthy(self): async def isClusterReady(app): sn_count = 0 dn_count = 0 + active_sn_ids = app["active_sn_ids"] + active_dn_ids = app["active_dn_ids"] target_sn_count = await getTargetNodeCount(app, "sn") target_dn_count = await getTargetNodeCount(app, "dn") last_create_time = None @@ -115,9 +117,11 @@ async def isClusterReady(app): if last_create_time is None or node.create_time > last_create_time: last_create_time = node.create_time if node.type == "sn": - sn_count += 1 + if node_id in active_sn_ids: + sn_count += 1 else: - dn_count += 1 + if node_id in active_dn_ids: + dn_count += 1 if sn_count == 0 or dn_count == 0: log.debug("no nodes, cluster not ready") return False @@ -171,6 +175,20 @@ async def info(request): return resp +def getNodeUrls(nodes, node_ids): + """ return a list of node urls for the given set of node ids """ + + node_urls = [] + for node_id in node_ids: + if node_id: + node = nodes[node_id] + node_url = f"http://{node.host}:{node.port}" + node_urls.append(node_url) + else: + node_urls.append(None) + return node_urls + + async def register(request): """HTTP method for nodes to register with head node""" app = request.app @@ -208,7 +226,7 @@ async def register(request): log.debug("register - get ip/port from request.transport") peername = request.transport.get_extra_info("peername") if peername is None: - msg = "Can not determine caller IP" + msg = "Cannot determine caller IP" log.error(msg) raise HTTPBadRequest(reason=msg) if peername[0] is None or peername[0] in ("::1", "127.0.0.1"): @@ -255,10 +273,34 @@ async def register(request): node_host=node_host, node_port=node_port, ) - # delete any existing node with the same port + # delete any existing node with the same port and IP removeNode(app, host=node_host, port=node_port) nodes[node_id] = node + # add to the active list if there's an open slot + if node_type == "sn": + active_list = app["active_sn_ids"] + else: + active_list = app["active_dn_ids"] + + tgt_count = len(active_list) + active_count = sum(id is not None for id in active_list) + if tgt_count == active_count: + # all the slots are filled, see if there is any unhealthy node + # and remove that + for i in range(len(active_list)): + id = active_list[i] + node = nodes[id] + if not node.is_healthy(): + active_list[i] = None # clear the slot + break + + for i in range(len(active_list)): + if not active_list[i]: + log.info(f"Node {node_id} added to {node_type} active list in slot: {i}") + active_list[i] = node_id + break + resp = StreamResponse() resp.headers["Content-Type"] = "application/json" answer = {} @@ -267,38 +309,14 @@ async def register(request): answer["cluster_state"] = "READY" else: answer["cluster_state"] = "WAITING" - sn_urls = [] - dn_urls = [] - sn_ids = [] - dn_ids = [] - for node_id in nodes: - node = nodes[node_id] - if not node.is_healthy(): - continue - node_url = f"http://{node.host}:{node.port}" - if node.type == "sn": - sn_urls.append(node_url) - sn_ids.append(node_id) - else: - dn_urls.append(node_url) - dn_ids.append(node_id) - # sort dn_urls so node number can be determined - dn_id_map = {} - for i in range(len(dn_urls)): - dn_url = dn_urls[i] - dn_id = dn_ids[i] - dn_id_map[dn_url] = dn_id - - dn_urls.sort() - dn_ids = [] # re-arrange to match url order - for dn_url in dn_urls: - dn_ids.append(dn_id_map[dn_url]) + sn_urls = getNodeUrls(nodes, app["active_sn_ids"]) + dn_urls = getNodeUrls(nodes, app["active_dn_ids"]) + answer["sn_ids"] = app["active_sn_ids"] answer["sn_urls"] = sn_urls + answer["dn_ids"] = app["active_dn_ids"] answer["dn_urls"] = dn_urls - answer["sn_ids"] = sn_ids - answer["dn_ids"] = dn_ids answer["req_ip"] = node_host log.debug(f"register returning: {answer}") app["last_health_check"] = int(time.time()) @@ -410,7 +428,7 @@ async def nodeinfo(request): async def getTargetNodeCount(app, node_type): if node_type == "dn": - key = "target_sn_count" + key = "target_dn_count" elif node_type == "sn": key = "target_sn_count" else: @@ -430,7 +448,12 @@ async def getTargetNodeCount(app, node_type): def getActiveNodeCount(app, node_type): count = 0 nodes = app["nodes"] - for node_id in nodes: + if node_type == "sn": + active_list = app["active_sn_ids"] + else: + active_list = app["active_dn_ids"] + + for node_id in active_list: node = nodes[node_id] if node.type != node_type: continue @@ -462,8 +485,6 @@ async def init(): app["head_port"] = config.get("head_port") - nodes = {} - # check to see if we are running in a DCOS cluster if "MARATHON_APP_ID" in os.environ: msg = "Found MARATHON_APP_ID environment variable, setting " @@ -473,7 +494,12 @@ async def init(): else: log.info("not setting is_dcos") - app["nodes"] = nodes + target_sn_count = await getTargetNodeCount(app, "sn") + target_dn_count = await getTargetNodeCount(app, "dn") + + app["nodes"] = {} + app["active_sn_ids"] = [None, ] * target_sn_count + app["active_dn_ids"] = [None, ] * target_dn_count app["dead_node_ids"] = set() app["start_time"] = int(time.time()) # seconds after epoch app["last_health_check"] = 0 diff --git a/hsds/util/azureBlobClient.py b/hsds/util/azureBlobClient.py index 6b22c869..ba9a2eab 100644 --- a/hsds/util/azureBlobClient.py +++ b/hsds/util/azureBlobClient.py @@ -134,7 +134,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1): if isinstance(e, AzureError): if e.status_code == 404: msg = f"storage key: {key} not found " - log.warn(msg) + log.info(msg) raise HTTPNotFound() elif e.status_code in (401, 403): msg = f"azureBlobClient.access denied for get key: {key}" diff --git a/hsds/util/fileClient.py b/hsds/util/fileClient.py index 1bc5e786..a9d77421 100644 --- a/hsds/util/fileClient.py +++ b/hsds/util/fileClient.py @@ -156,7 +156,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1): log.info(msg) except FileNotFoundError: msg = f"fileClient: {key} not found " - log.warn(msg) + log.info(msg) raise HTTPNotFound() except IOError as ioe: msg = f"fileClient: IOError reading {bucket}/{key}: {ioe}"