Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/miniconda:0-3

# commenting out mamba install, is given an error, see:
# https://github.com/conda/conda-libmamba-solver/issues/540
RUN conda install -n base -c conda-forge mamba
RUN conda install -n base -c conda-forge

# Copy environment.yml (if found) to a temp location so we update the environment. Also
# copy "noop.txt" so the COPY instruction does not fail if no environment.yml exists.
Expand Down
100 changes: 63 additions & 37 deletions hsds/headnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def is_healthy(self):
async def isClusterReady(app):
sn_count = 0
dn_count = 0
active_sn_ids = app["active_sn_ids"]
active_dn_ids = app["active_dn_ids"]
target_sn_count = await getTargetNodeCount(app, "sn")
target_dn_count = await getTargetNodeCount(app, "dn")
last_create_time = None
Expand All @@ -115,9 +117,11 @@ async def isClusterReady(app):
if last_create_time is None or node.create_time > last_create_time:
last_create_time = node.create_time
if node.type == "sn":
sn_count += 1
if node_id in active_sn_ids:
sn_count += 1
else:
dn_count += 1
if node_id in active_dn_ids:
dn_count += 1
if sn_count == 0 or dn_count == 0:
log.debug("no nodes, cluster not ready")
return False
Expand Down Expand Up @@ -171,6 +175,20 @@ async def info(request):
return resp


def getNodeUrls(nodes, node_ids):
""" return a list of node urls for the given set of node ids """

node_urls = []
for node_id in node_ids:
if node_id:
node = nodes[node_id]
node_url = f"http://{node.host}:{node.port}"
node_urls.append(node_url)
else:
node_urls.append(None)
return node_urls


async def register(request):
"""HTTP method for nodes to register with head node"""
app = request.app
Expand Down Expand Up @@ -208,7 +226,7 @@ async def register(request):
log.debug("register - get ip/port from request.transport")
peername = request.transport.get_extra_info("peername")
if peername is None:
msg = "Can not determine caller IP"
msg = "Cannot determine caller IP"
log.error(msg)
raise HTTPBadRequest(reason=msg)
if peername[0] is None or peername[0] in ("::1", "127.0.0.1"):
Expand Down Expand Up @@ -255,10 +273,34 @@ async def register(request):
node_host=node_host,
node_port=node_port,
)
# delete any existing node with the same port
# delete any existing node with the same port and IP
removeNode(app, host=node_host, port=node_port)
nodes[node_id] = node

# add to the active list if there's an open slot
if node_type == "sn":
active_list = app["active_sn_ids"]
else:
active_list = app["active_dn_ids"]

tgt_count = len(active_list)
active_count = sum(id is not None for id in active_list)
if tgt_count == active_count:
# all the slots are filled, see if there is any unhealthy node
# and remove that
for i in range(len(active_list)):
id = active_list[i]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using id which shadows the built-in function. Use a more descriptive name like node_id for clarity.

node = nodes[id]
if not node.is_healthy():
active_list[i] = None # clear the slot
break

for i in range(len(active_list)):
if not active_list[i]:
log.info(f"Node {node_id} added to {node_type} active list in slot: {i}")
active_list[i] = node_id
break

resp = StreamResponse()
resp.headers["Content-Type"] = "application/json"
answer = {}
Expand All @@ -267,38 +309,14 @@ async def register(request):
answer["cluster_state"] = "READY"
else:
answer["cluster_state"] = "WAITING"
sn_urls = []
dn_urls = []
sn_ids = []
dn_ids = []
for node_id in nodes:
node = nodes[node_id]
if not node.is_healthy():
continue
node_url = f"http://{node.host}:{node.port}"
if node.type == "sn":
sn_urls.append(node_url)
sn_ids.append(node_id)
else:
dn_urls.append(node_url)
dn_ids.append(node_id)

# sort dn_urls so node number can be determined
dn_id_map = {}
for i in range(len(dn_urls)):
dn_url = dn_urls[i]
dn_id = dn_ids[i]
dn_id_map[dn_url] = dn_id

dn_urls.sort()
dn_ids = [] # re-arrange to match url order
for dn_url in dn_urls:
dn_ids.append(dn_id_map[dn_url])
sn_urls = getNodeUrls(nodes, app["active_sn_ids"])
dn_urls = getNodeUrls(nodes, app["active_dn_ids"])

answer["sn_ids"] = app["active_sn_ids"]
answer["sn_urls"] = sn_urls
answer["dn_ids"] = app["active_dn_ids"]
answer["dn_urls"] = dn_urls
answer["sn_ids"] = sn_ids
answer["dn_ids"] = dn_ids
answer["req_ip"] = node_host
log.debug(f"register returning: {answer}")
app["last_health_check"] = int(time.time())
Expand Down Expand Up @@ -410,7 +428,7 @@ async def nodeinfo(request):
async def getTargetNodeCount(app, node_type):

if node_type == "dn":
key = "target_sn_count"
key = "target_dn_count"
elif node_type == "sn":
key = "target_sn_count"
else:
Expand All @@ -430,7 +448,12 @@ async def getTargetNodeCount(app, node_type):
def getActiveNodeCount(app, node_type):
count = 0
nodes = app["nodes"]
for node_id in nodes:
if node_type == "sn":
active_list = app["active_sn_ids"]
else:
active_list = app["active_dn_ids"]

for node_id in active_list:
node = nodes[node_id]
if node.type != node_type:
continue
Expand Down Expand Up @@ -462,8 +485,6 @@ async def init():

app["head_port"] = config.get("head_port")

nodes = {}

# check to see if we are running in a DCOS cluster
if "MARATHON_APP_ID" in os.environ:
msg = "Found MARATHON_APP_ID environment variable, setting "
Expand All @@ -473,7 +494,12 @@ async def init():
else:
log.info("not setting is_dcos")

app["nodes"] = nodes
target_sn_count = await getTargetNodeCount(app, "sn")
target_dn_count = await getTargetNodeCount(app, "dn")

app["nodes"] = {}
app["active_sn_ids"] = [None, ] * target_sn_count
app["active_dn_ids"] = [None, ] * target_dn_count
app["dead_node_ids"] = set()
app["start_time"] = int(time.time()) # seconds after epoch
app["last_health_check"] = 0
Expand Down
2 changes: 1 addition & 1 deletion hsds/util/azureBlobClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
if isinstance(e, AzureError):
if e.status_code == 404:
msg = f"storage key: {key} not found "
log.warn(msg)
log.info(msg)
raise HTTPNotFound()
elif e.status_code in (401, 403):
msg = f"azureBlobClient.access denied for get key: {key}"
Expand Down
2 changes: 1 addition & 1 deletion hsds/util/fileClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
log.info(msg)
except FileNotFoundError:
msg = f"fileClient: {key} not found "
log.warn(msg)
log.info(msg)
raise HTTPNotFound()
except IOError as ioe:
msg = f"fileClient: IOError reading {bucket}/{key}: {ioe}"
Expand Down
Loading