diff --git a/concore.py b/concore.py index 2da1250..6d1763c 100644 --- a/concore.py +++ b/concore.py @@ -6,6 +6,9 @@ import re import zmq import numpy as np +import atexit +import signal + logging.basicConfig( level=logging.INFO, format='%(levelname)s - %(message)s' @@ -72,6 +75,7 @@ def recv_json_with_retry(self): # Global ZeroMQ ports registry zmq_ports = {} +_cleanup_in_progress = False def init_zmq_port(port_name, port_type, address, socket_type_str): """ @@ -98,12 +102,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str): logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") def terminate_zmq(): - for port in zmq_ports.values(): + """Clean up all ZMQ sockets and contexts before exit.""" + global _cleanup_in_progress + + if _cleanup_in_progress: + return # Already cleaning up, prevent reentrant calls + + if not zmq_ports: + return # No ports to clean up + + _cleanup_in_progress = True + print("\nCleaning up ZMQ resources...") + for port_name, port in zmq_ports.items(): try: port.socket.close() port.context.term() + print(f"Closed ZMQ port: {port_name}") except Exception as e: logging.error(f"Error while terminating ZMQ port {port.address}: {e}") + zmq_ports.clear() + _cleanup_in_progress = False + +def signal_handler(sig, frame): + """Handle interrupt signals gracefully.""" + print(f"\nReceived signal {sig}, shutting down gracefully...") + # Prevent terminate_zmq from being called twice: once here and once via atexit + try: + atexit.unregister(terminate_zmq) + except Exception: + # If unregister fails for any reason, proceed with explicit cleanup anyway + pass + terminate_zmq() + sys.exit(0) + +# Register cleanup handlers +atexit.register(terminate_zmq) +signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C +if not hasattr(sys, 'getwindowsversion'): + signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only) + # --- ZeroMQ Integration End ---