Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ddff950
Client config (sdmi centric)
clauspruefer Jan 23, 2026
dcc4b34
Single DB-node docker file
clauspruefer Jan 23, 2026
337a478
Add run container shell script
clauspruefer Jan 24, 2026
d9b0ce0
Disable iptables in docker daemon config
clauspruefer Jan 24, 2026
8503e66
Remove unrelevant runtime sub-data
clauspruefer Jan 24, 2026
5692699
Add local orchestrator code
clauspruefer Jan 24, 2026
054b108
Add docker metadata
clauspruefer Jan 24, 2026
70a08d6
Update docker related
clauspruefer Jan 26, 2026
1f0da9c
Add service metadata templates
clauspruefer Jan 26, 2026
bc2cacd
Add main orchestrator
clauspruefer Jan 26, 2026
34c7f62
Update config
clauspruefer Jan 26, 2026
75b90c4
Add json-rpc server component
clauspruefer Jan 26, 2026
3676b5f
Add server start script
clauspruefer Jan 26, 2026
6af53de
Add ESB metadata / processing logic
clauspruefer Jan 26, 2026
d5d8c05
Replace jsocket with local cleaned-up (working) version
clauspruefer Jan 30, 2026
ee47dcd
Fix bugs
clauspruefer Jan 30, 2026
74a442e
Add missing default values
clauspruefer Jan 30, 2026
acdcb34
Update (bug fix, working with modified jsocket)
clauspruefer Jan 30, 2026
81999a1
Add missing default values
clauspruefer Jan 30, 2026
f3c316a
Disable iptables globally in docker-daemon
clauspruefer Jan 30, 2026
adbe82e
Update RPC model
clauspruefer Jan 31, 2026
2091db7
Add domain properties
clauspruefer Jan 31, 2026
938d19a
Update service call metadata
clauspruefer Jan 31, 2026
c2ff770
Update
clauspruefer Jan 31, 2026
cefb386
Add global sql queries
clauspruefer Feb 2, 2026
9bd2bbe
Fix path
clauspruefer Feb 2, 2026
0af58da
Remove unneeded modules
clauspruefer Feb 2, 2026
f97c08d
Add queries
clauspruefer Feb 5, 2026
87b7d7c
Increase max nodes
clauspruefer Feb 5, 2026
70f8121
Make use of max_nodes config property
clauspruefer Feb 5, 2026
0840382
Describe replication order
clauspruefer Feb 5, 2026
f8bf320
Remove empty lines
clauspruefer Feb 5, 2026
1090568
Correct psycopg2 syntax
clauspruefer Feb 7, 2026
f1a2573
Add network reference property
clauspruefer Feb 7, 2026
7ade606
Column replacement will be handled internally
clauspruefer Feb 7, 2026
eb8ab05
Change node id naming schema
clauspruefer Feb 9, 2026
c8f9d5d
Update function naming
clauspruefer Feb 9, 2026
e698368
Add processing logic
clauspruefer Feb 9, 2026
d401c70
Fix typo, ommit last list element from being processed
clauspruefer Feb 9, 2026
6203cad
Adapt replication order to correct postgres syntax
clauspruefer Feb 11, 2026
7355395
Add setting wal_type (logical) and pg_hba authentication on container…
clauspruefer Feb 11, 2026
5a1ab35
Running with publication(s) and subscriptiions from node (only orches…
clauspruefer Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions example/01-logical-replication/REPLICATION-ORDER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Precise Replication Order

| NodeID | SrcNode | Function | ProcOn | Command |
| ------- | ------- | ------------------- | ------ | ----------------------------------- |
| Node0 | node0 | create_publication | node | create pub 'pub_node0_table1' |
| Node1 | node1 | create_publication | node | create pub 'pub_node1_table1' |
| | node1 | subscribe_to_others | node | create sub 'sub_node1_node0_table1' |
| | node0 | subscribe_to_node | orch | create sub 'sub_node0_node1_table1' |
| Node2 | node2 | create_publication | node | create pub 'pub_node2_table1' |
| | node2 | subscribe_to_others | node | create sub 'sub_node2_node1_table1' |
| | node2 | subscribe_to_others | node | create sub 'sub_node2_node0_table1' |
| | node1 | subscribe_to_node | orch | create sub 'sub_node1_node2_table1' |
| | node0 | subscribe_to_node | orch | create sub 'sub_node0_node2_table1' |
18 changes: 18 additions & 0 deletions example/01-logical-replication/db-node-rpc/class_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class_mapping = {
'UpdateNetworkTopology': {
'System': 'System',
'Network': 'Network',
'NetworkTopology': 'NetworkTopology',
'NetIPv4': 'NetIPv4',
'NetIPv6': 'NetIPv6',
'TopologyHost': 'TopologyHost'
},
'InitDatabase': {
'Database': 'Database'
},
'CreateReplicaTable': {
'Database': 'Database',
'Table': 'Table',
'Column': 'Column'
}
}
47 changes: 47 additions & 0 deletions example/01-logical-replication/db-node-rpc/class_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
references = {
'UpdateNetworkTopology': {
'System': {
'property_ref': 'System',
'children': {
'Network': {
'property_ref': 'Network'
},
'NetworkTopology': {
'property_ref': 'NetworkTopology',
'children': {
'NetIPv4': {
'property_ref': 'NetIPv4'
},
'NetIPv6': {
'property_ref': 'NetIPv6'
},
'TopologyHost': {
'property_ref': 'TopologyHost'
}
}
}
}
}
},
'InitDatabase': {
'Database': {
'property_ref': 'Database',
'children': {}
}
},
'CreateReplicaTable': {
'Database': {
'property_ref': 'Database',
'children': {
'Table': {
'property_ref': 'Table',
'children': {
'Column': {
'property_ref': 'Column'
}
}
}
}
}
}
}
13 changes: 13 additions & 0 deletions example/01-logical-replication/db-node-rpc/esbconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import_classes = {
'service_implementation': [
'System',
'Network',
'NetworkTopology',
'TopologyHost',
'NetIPv4',
'NetIPv6',
'Database',
'Table',
'Column'
]
}
54 changes: 54 additions & 0 deletions example/01-logical-replication/db-node-rpc/json-rpc-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import sys
import jsocket
import logging
import subprocess

from microesb import microesb

from class_reference import references as class_reference
from service_properties import service_properties
from class_mapping import class_mapping

logging.getLogger().addHandler(
logging.FileHandler(filename='/tmp/app.log')
)

logging.getLogger().setLevel(
logging.DEBUG
)


def get_current_ip_address():
cmd_get_ip = 'ip -h addr show dev eth0 | grep inet | cut -d " " -f 6'
res = subprocess.run(cmd_get_ip, shell=True, capture_output=True)
raw_ip = res.stdout.strip()
raw_ip_sep = raw_ip.find(b'/')
return raw_ip[:raw_ip_sep]


class JSONServer(jsocket.JsonServer):

def __init__(self, **kwargs):
super().__init__(**kwargs)

def _process_message(self, call_obj):
if isinstance(call_obj, dict):
logging.info('RPC call obj:{}'.format(call_obj))
class_mapper_ref = microesb.ClassMapper(
class_references=class_reference[call_obj['SYSServiceID']],
class_mappings=class_mapping[call_obj['SYSServiceID']],
class_properties=service_properties
)
res = microesb.ServiceExecuter().execute(
class_mapper=class_mapper_ref,
service_data=call_obj
)
logging.info('RPC result:{}'.format(res))
return { "Status": "ok" }
return { "Status": "error - objtype not dict()" }


server = JSONServer(
address=get_current_ip_address(),
port=64000
).server_loop()
192 changes: 192 additions & 0 deletions example/01-logical-replication/db-node-rpc/service_implementation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import json
import logging
import psycopg2
import subprocess
import sql_queries

from microesb import microesb

logger = logging.getLogger(__name__)


class System(microesb.ClassHandler):

def __init__(self):
super().__init__()

def update_network_topology(self):

self.json_transform()

net_config = {}
net_config['System'] = self.json_dict
net_config['Network'] = self.Network.json_dict
net_config['NetworkTopology'] = self.NetworkTopology.TopologyHost.json_dict

with open('/tmp/net-config.json', 'w') as fh:
fh.write(json.dumps(net_config))


class Network(microesb.ClassHandler):

def __init__(self):
super().__init__()


class NetworkTopology(microesb.ClassHandler):

def __init__(self):
super().__init__()


class NetIPv4(microesb.ClassHandler):

def __init__(self):
super().__init__()


class NetIPv6(microesb.ClassHandler):

def __init__(self):
super().__init__()


class TopologyHost(microesb.MultiClassHandler):

def __init__(self):
super().__init__()


class Database(microesb.ClassHandler):

def __init__(self):
super().__init__()
self.conn = False
self.host = '127.0.0.1'
self.user = 'postgres'
self.autocommit = True

with open('/tmp/net-config.json', 'r') as fh:
self.netconf = json.loads(fh.read())

def _connect(self):
self.conn = psycopg2.connect(
"dbname='{}' user='{}' host='{}'".format(
self.name,
self.user,
self.host
)
)
self.conn.autocommit = self.autocommit

def init_db(self):

cmd_alter_db = "psql -U postgres -c '{}'".format(sql_queries.init_database)
subprocess.run(cmd_alter_db, shell=True, capture_output=True)

self._connect()

with self.conn.cursor() as crs:
crs.execute(sql_queries.init_roles)

def create_replica_table(self):

self._connect()

ct_sql = sql_queries.create_table.format(
table_name=self.Table.name,
table_columns=self.Table.get_table_sql()
)

logger.debug(ct_sql)

with self.conn.cursor() as crs:
crs.execute(ct_sql)

self._create_publication()
self._subscribe_to_others()

def _create_publication(self):
with self.conn.cursor() as crs:
crs.execute(
sql_queries.create_publication.format(
table_name=self.Table.name,
publication_id=self._gen_publication_id()
)
)

def _subscribe_to_others(self):
node_index = self.netconf['System']['node_index']
if node_index > 0:
host_list = self.netconf['NetworkTopology']['TopologyHost']
host_list_cut = host_list[0:node_index]
logger.debug('host_list_cut:{}'.format(host_list_cut))
for node in reversed(host_list_cut):
with self.conn.cursor() as crs:
crs.execute(
sql_queries.create_subscription.format(
host_ip=node['ipv4'],
subscription_id=self._gen_subscription_id(node['name']),
publication_id='pub_{}_{}'.format(node['name'], self.Table.name)
)
)

def subscribe_to_node(self, node_id):
pass

def _gen_publication_id(self):
return 'pub_{}_{}'.format(
self.netconf['Network']['hostname'],
self.Table.name
)

def _gen_subscription_id(self, dst_node_id):
return 'sub_{}_{}_{}'.format(
self.netconf['System']['node_id'],
dst_node_id,
self.Table.name
)


class Table(microesb.ClassHandler):

def __init__(self):
super().__init__()

def get_table_sql(self):
ret_string = ''
for val in self._gen_table_sql():
ret_string += val
return ret_string

def _gen_table_sql(self):
for class_ref in self.Column:
yield class_ref.get_column_sql()
if self.add_timestamp_cols is True:
yield 'created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, '
yield 'modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP'


class Column(microesb.MultiClassHandler):

def __init__(self):
super().__init__()
self.primary_key = False
self.default = False
self.not_null = False

def get_column_sql(self):
ret_string = ''
for val in self._gen_column_sql():
ret_string += val
return ret_string

def _gen_column_sql(self):
yield '{} {}'.format(self.name, self.type)
if self.default is True:
yield ' DEFAULT {}'.format(self.default)
if self.primary_key is True:
yield ' PRIMARY KEY'
if self.not_null is True:
yield ' NOT NULL'
yield ', '
Loading