Skip to content

Commit

Permalink
recreate the internal proxy to the local Supervisor if older than 20 …
Browse files Browse the repository at this point in the history
…minutes
  • Loading branch information
julien6387 committed Jun 5, 2024
1 parent 1086c21 commit f7c5626
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 6 deletions.
11 changes: 8 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# Change Log

## 0.18.3 (2024-xx-xx)
## 0.18.3 (2024-06-05)

* TODO
* Recreate the internal proxy to the local Supervisor if older than 20 minutes, in order to palliate the closure
of all channels inactive during 30 minutes in the Supervisor HTTP server.
This fixes a rare bug that has been introduced in **Supvisors** 0.17, and whose occurrence has greatly
increased after the refactoring of the **Supvisors** 0.18 internal communications.

* Swap Memory and Network statistics cards in the **Supvisors** Web UI.


## 0.18.2 (2024-05-27)

* Handle PermissionError exception when trying to get disk usage.
* Handle the `PermissionError` exception when trying to get disk usage.

* Update CSS for overflow parts in the **Supvisors** Web UI.

Expand Down
29 changes: 28 additions & 1 deletion supvisors/internal_com/supervisorproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json
import queue
import threading
import time
import traceback
from enum import Enum
from http.client import HTTPException
Expand All @@ -36,6 +37,10 @@
# List of keys useful to build a SupvisorsState event
StateModesKeys = ['fsm_statecode', 'discovery_mode', 'master_identifier', 'starting_jobs', 'stopping_jobs']

# life expectation for the local proxy
# Supervisor close any HTTP channel after 30 minutes without activity
LOCAL_PROXY_DURATION = 20 * 60


class InternalEventHeaders(Enum):
""" Event type for deferred XML-RPCs. """
Expand All @@ -55,7 +60,8 @@ def __init__(self, status: SupvisorsInstanceStatus, supvisors: Any):
self.status: SupvisorsInstanceStatus = status
self.supvisors = supvisors
# create an XML-RPC client to the local Supervisor instance
self.proxy = self._get_proxy()
self._proxy = self._get_proxy()
self.last_used: float = time.monotonic()

@property
def logger(self) -> Logger:
Expand All @@ -67,6 +73,23 @@ def local_identifier(self) -> str:
""" Get the local Supvisors instance identifier. """
return self.supvisors.mapper.local_identifier

@property
def proxy(self) -> Logger:
""" Get the Supervisor proxy.
WARN: The proxy to the local Supervisor is a LOT less used than the others and is really subject to be broken
by the http_channel.kill_zombies (supervisor/medusa/http_server.py) that will close the channel after
30 minutes of inactivity (magic number).
Let's re-create the local proxy once every 20 minutes.
All other proxies will be maintained through to the TICK publication.
"""
if self.status.supvisors_id.identifier == self.local_identifier:
if time.monotonic() - self.last_used > LOCAL_PROXY_DURATION:
self.logger.debug(f'SupervisorProxy.proxy: recreate local Supervisor proxy')
self._proxy = self._get_proxy()
self.last_used = time.monotonic()
return self._proxy

def _get_proxy(self):
""" Get the proxy corresponding to the Supervisor identifier. """
instance_id = self.status.supvisors_id
Expand All @@ -81,6 +104,9 @@ def _get_origin(self, from_identifier: str) -> Tuple[str, Ipv4Address]:

def xml_rpc(self, fct_name: str, fct, args):
""" Common exception handling on XML-RPC methods. """
# reset the proxy usage time
self.last_used = time.monotonic()
# call the XML-RPC
try:
return fct(*args)
except RPCError as exc:
Expand All @@ -92,6 +118,7 @@ def xml_rpc(self, fct_name: str, fct, args):
except (OSError, HTTPException) as exc:
# transport issue due to network or remote Supervisor failure (includes a bunch of exceptions, such as
# socket.gaierror, ConnectionResetError, ConnectionRefusedError, CannotSendRequest, IncompleteRead, etc.)
# also raised if the HTTP channel has been closed by the Supervisor kill_zombies (30 minutes inactivity)
# the proxy is not operational - error log only if instance is active
log_level = LevelsByName.ERRO if self.status.has_active_state() else LevelsByName.DEBG
message = f'SupervisorProxy.xml_rpc: Supervisor={self.status.usage_identifier} not reachable - {str(exc)}'
Expand Down
32 changes: 31 additions & 1 deletion supvisors/tests/test_supervisorproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import http.client
import socket
import time
from unittest.mock import call, patch, Mock, DEFAULT

import pytest
Expand Down Expand Up @@ -66,13 +65,33 @@ def test_proxy_creation(mocked_rpc, proxy, supvisors):
assert proxy.supvisors is supvisors
assert proxy.status is supvisors.context.instances['10.0.0.1:25000']
assert proxy.proxy is not None
assert 0.0 < proxy.last_used < time.monotonic()
assert proxy.logger is supvisors.logger
assert proxy.local_identifier == supvisors.mapper.local_identifier
assert mocked_rpc.call_args_list == [call({'SUPERVISOR_SERVER_URL': 'http://10.0.0.1:25000',
'SUPERVISOR_USERNAME': 'user',
'SUPERVISOR_PASSWORD': 'p@$$w0rd'})]


def test_proxy_proxy(supvisors, proxy):
""" Test the SupvisorsProxy proxy property. """
# test with non-local proxy
assert proxy.status.supvisors_id.identifier != proxy.local_identifier
ref_proxy = proxy._proxy
ref_usage = proxy.last_used
assert proxy.proxy is ref_proxy
assert proxy.last_used == ref_usage
# test with local proxy and recent usage
proxy.status = supvisors.context.local_status
assert proxy.status.supvisors_id.identifier == proxy.local_identifier
assert proxy.proxy is ref_proxy
assert proxy.last_used == ref_usage
# test with local proxy and old usage (cannot test everything due to patch)
proxy.last_used = time.monotonic() - LOCAL_PROXY_DURATION - 1
assert proxy.proxy
assert proxy.last_used > ref_usage


def test_get_origin(supvisors, proxy):
""" Test the SupervisorProxy._get_origin method. """
local_instance = supvisors.mapper.local_instance
Expand All @@ -82,28 +101,39 @@ def test_get_origin(supvisors, proxy):

def test_proxy_xml_rpc(supvisors, proxy):
""" Test the SupervisorProxy function to send any XML-RPC to a Supervisor instance. """
ref_usage = proxy.last_used
mocked_fct = Mock()
# test no error
proxy.xml_rpc('normal', mocked_fct, ())
assert mocked_fct.call_args_list == [call()]
assert proxy.last_used > ref_usage
ref_usage = proxy.last_used
mocked_fct.reset_mock()
proxy.xml_rpc('normal', mocked_fct, ('hello',))
assert mocked_fct.call_args_list == [call('hello')]
assert proxy.last_used > ref_usage
ref_usage = proxy.last_used
mocked_fct.reset_mock()
proxy.xml_rpc('normal', mocked_fct, ('hello', 28))
assert mocked_fct.call_args_list == [call('hello', 28)]
assert proxy.last_used > ref_usage
ref_usage = proxy.last_used
mocked_fct.reset_mock()
# test minor exception (remote Supvisors instance is operational)
mocked_fct.side_effect = RPCError(code=58)
proxy.xml_rpc('normal', mocked_fct, ('hello', 28))
assert mocked_fct.call_args_list == [call('hello', 28)]
assert proxy.last_used > ref_usage
ref_usage = proxy.last_used
mocked_fct.reset_mock()
# test major exception (remote Supvisors instance is NOT operational)
for exc_class in [OSError, HTTPException, xmlrpclib.Fault(77, 'fault'), KeyError, ValueError, TypeError]:
mocked_fct.side_effect = exc_class
with pytest.raises(SupervisorProxyException):
proxy.xml_rpc('normal', mocked_fct, ('hello',))
assert mocked_fct.call_args_list == [call('hello')]
assert proxy.last_used > ref_usage
ref_usage = proxy.last_used
mocked_fct.reset_mock()


Expand Down
2 changes: 1 addition & 1 deletion supvisors/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.18.2
version=0.18.3

0 comments on commit f7c5626

Please sign in to comment.