From ff3206ab17535d0f35879f0a60f0743383b1850d Mon Sep 17 00:00:00 2001 From: Sahil Lenka Date: Thu, 5 Feb 2026 02:19:11 +0530 Subject: [PATCH 1/2] fix: add atexit and signal handlers for ZMQ cleanup - Register terminate_zmq() with atexit to ensure cleanup on normal exit - Add signal handlers for SIGINT (Ctrl+C) and SIGTERM - Improve terminate_zmq() with better logging and error handling - Clear zmq_ports dict after cleanup to prevent double-cleanup --- concore.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/concore.py b/concore.py index 2da1250..4072505 100644 --- a/concore.py +++ b/concore.py @@ -6,6 +6,9 @@ import re import zmq import numpy as np +import atexit +import signal + logging.basicConfig( level=logging.INFO, format='%(levelname)s - %(message)s' @@ -98,12 +101,31 @@ def init_zmq_port(port_name, port_type, address, socket_type_str): logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") def terminate_zmq(): - for port in zmq_ports.values(): + """Clean up all ZMQ sockets and contexts before exit.""" + if not zmq_ports: + return # No ports to clean up + + print("\nCleaning up ZMQ resources...") + for port_name, port in zmq_ports.items(): try: port.socket.close() port.context.term() + print(f"Closed ZMQ port: {port_name}") except Exception as e: logging.error(f"Error while terminating ZMQ port {port.address}: {e}") + zmq_ports.clear() + +def signal_handler(sig, frame): + """Handle interrupt signals gracefully.""" + print(f"\nReceived signal {sig}, shutting down gracefully...") + terminate_zmq() + sys.exit(0) + +# Register cleanup handlers +atexit.register(terminate_zmq) +signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C +signal.signal(signal.SIGTERM, signal_handler) # Handle termination + # --- ZeroMQ Integration End --- From b3268f371a8c281e396f9d36de5d2f153a7e6227 Mon Sep 17 00:00:00 2001 From: Sahil Lenka Date: Thu, 5 Feb 2026 02:38:31 +0530 Subject: [PATCH 2/2] fix: address Copilot review, prevent reentrant cleanup and platform-specific SIGTERM --- concore.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/concore.py b/concore.py index 4072505..6d1763c 100644 --- a/concore.py +++ b/concore.py @@ -75,6 +75,7 @@ def recv_json_with_retry(self): # Global ZeroMQ ports registry zmq_ports = {} +_cleanup_in_progress = False def init_zmq_port(port_name, port_type, address, socket_type_str): """ @@ -102,9 +103,15 @@ def init_zmq_port(port_name, port_type, address, socket_type_str): def terminate_zmq(): """Clean up all ZMQ sockets and contexts before exit.""" + global _cleanup_in_progress + + if _cleanup_in_progress: + return # Already cleaning up, prevent reentrant calls + if not zmq_ports: return # No ports to clean up + _cleanup_in_progress = True print("\nCleaning up ZMQ resources...") for port_name, port in zmq_ports.items(): try: @@ -114,17 +121,25 @@ def terminate_zmq(): except Exception as e: logging.error(f"Error while terminating ZMQ port {port.address}: {e}") zmq_ports.clear() + _cleanup_in_progress = False def signal_handler(sig, frame): """Handle interrupt signals gracefully.""" print(f"\nReceived signal {sig}, shutting down gracefully...") + # Prevent terminate_zmq from being called twice: once here and once via atexit + try: + atexit.unregister(terminate_zmq) + except Exception: + # If unregister fails for any reason, proceed with explicit cleanup anyway + pass terminate_zmq() sys.exit(0) # Register cleanup handlers atexit.register(terminate_zmq) signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C -signal.signal(signal.SIGTERM, signal_handler) # Handle termination +if not hasattr(sys, 'getwindowsversion'): + signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only) # --- ZeroMQ Integration End ---