diff --git a/example/01-logical-replication/REPLICATION-ORDER.md b/example/01-logical-replication/REPLICATION-ORDER.md new file mode 100644 index 0000000..06afa36 --- /dev/null +++ b/example/01-logical-replication/REPLICATION-ORDER.md @@ -0,0 +1,13 @@ +# Precise Replication Order + +| NodeID | SrcNode | Function | ProcOn | Command | +| ------- | ------- | ----------------- | ------ | ----------------------------- | +| Node1 | node1 | node_pub_self | node | create pub 'pub-node1-table1' | +| Node2 | node2 | node_pub_self | node | create pub 'pub-node2-table1' | +| | node2 | node_sub_2others | node | create sub 'sub-node1-table1' | +| | node1 | others_sub_2node | orch | create sub 'sub-node2-table1' | +| Node3 | node3 | node_pub_self | node | create pub 'pub-node3-table1' | +| | node3 | node_sub_2others | node | create sub 'sub-node2-table1' | +| | node3 | node_sub_2others | node | create sub 'sub-node1-table1' | +| | node2 | others_sub_2node | orch | create sub 'sub-node3-table1' | +| | node1 | others_sub_2node | orch | create sub 'sub-node3-table1' | diff --git a/example/01-logical-replication/db-node-rpc/class_mapping.py b/example/01-logical-replication/db-node-rpc/class_mapping.py new file mode 100644 index 0000000..340ffc1 --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/class_mapping.py @@ -0,0 +1,11 @@ +class_mapping = { + 'System': 'System', + 'Network': 'Network', + 'NetworkTopology': 'NetworkTopology', + 'NetIPv4': 'NetIPv4', + 'NetIPv6': 'NetIPv6', + 'TopologyHost': 'TopologyHost', + 'Database': 'Database', + 'Table': 'Table', + 'Column': 'Column' +} diff --git a/example/01-logical-replication/db-node-rpc/class_reference.py b/example/01-logical-replication/db-node-rpc/class_reference.py new file mode 100644 index 0000000..a0ad0da --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/class_reference.py @@ -0,0 +1,46 @@ +references = { + 'UpdateNetworkTopology': { + 'System': { + 'property_ref': 'System', + 'children': { + 'Network': { + 'property_ref': 'Network' + }, + 'NetworkTopology': { + 'property_ref': 'NetworkTopology', + 'children': { + 'NetIPv4': { + 'property_ref': 'NetIPv4' + }, + 'NetIPv6': { + 'property_ref': 'NetIPv6' + }, + 'TopologyHost': { + 'property_ref': 'TopologyHost' + } + } + } + } + } + }, + 'InitDatabase': { + 'Database': { + 'property_ref': 'Database' + } + }, + 'CreateReplicaTable': { + 'Database': { + 'property_ref': 'Database', + 'children': { + 'Table': { + 'property_ref': 'Table', + 'children': { + 'Column': { + 'property_ref': 'Column' + } + } + } + } + } + } +} diff --git a/example/01-logical-replication/db-node-rpc/esbconfig.py b/example/01-logical-replication/db-node-rpc/esbconfig.py new file mode 100644 index 0000000..b10ca8d --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/esbconfig.py @@ -0,0 +1,13 @@ +import_classes = { + 'service_implementation': [ + 'System', + 'Network', + 'NetworkTopology', + 'TopologyHost', + 'NetIPv4', + 'NetIPv6', + 'Database', + 'Table', + 'Column' + ] +} diff --git a/example/01-logical-replication/db-node-rpc/json-rpc-server.py b/example/01-logical-replication/db-node-rpc/json-rpc-server.py new file mode 100644 index 0000000..1e3483d --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/json-rpc-server.py @@ -0,0 +1,53 @@ +import sys +import jsocket +import logging +import subprocess + +from microesb import microesb + +from class_reference import references as class_reference +from service_properties import service_properties +from class_mapping import class_mapping + +logging.getLogger().addHandler( + logging.StreamHandler(sys.stdout) +) + +logging.getLogger().setLevel( + logging.DEBUG +) + + +def get_current_ip_address(): + cmd_get_ip = 'ip -h addr show dev eth0 | grep inet | cut -d " " -f 6' + res = subprocess.run(cmd_get_ip, shell=True, capture_output=True) + raw_ip = res.stdout.strip() + raw_ip_sep = raw_ip.find(b'/') + return raw_ip[:raw_ip_sep] + + +class JSONServer(jsocket.JsonServer): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def _process_message(self, call_obj): + if isinstance(call_obj, dict): + print('Call obj:{}'.format(call_obj)) + class_mapper = microesb.ClassMapper( + class_references=class_reference[call_obj['SYSServiceID']], + class_mappings=class_mapping, + class_properties=service_properties + ) + res = microesb.ServiceExecuter().execute( + class_mapper=class_mapper, + service_data=call_obj + ) + return { "Status": "ok" } + return { "Status": "error - objtype not dict()" } + + +server = JSONServer( + address=get_current_ip_address(), + port=64000 +).server_loop() diff --git a/example/01-logical-replication/db-node-rpc/service_implementation.py b/example/01-logical-replication/db-node-rpc/service_implementation.py new file mode 100644 index 0000000..48f0bd6 --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/service_implementation.py @@ -0,0 +1,81 @@ +import json +import logging + +from microesb import microesb + +logger = logging.getLogger(__name__) + + +class System(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + def update_network_topology(self): + + self.json_transform() + + net_config = {} + net_config['Network'] = self.Network.json_dict + net_config['NetworkTopology'] = self.NetworkTopology.TopologyHost.json_dict + + with open('/tmp/net-config.json', 'w')as fh: + fh.write(json.dumps(net_config)) + + +class Network(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + +class NetworkTopology(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + +class NetIPv4(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + +class NetIPv6(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + +class TopologyHost(microesb.MultiClassHandler): + + def __init__(self): + super().__init__() + + +class Database(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + def init_db(self): + pass + + def create_replica_table(self): + pass + + +class Table(microesb.ClassHandler): + + def __init__(self): + super().__init__() + + +class Column(microesb.MultiClassHandler): + + def __init__(self): + super().__init__() + self.primary_key = False + self.name = None + self.type = None + self.default = None diff --git a/example/01-logical-replication/db-node-rpc/service_properties.py b/example/01-logical-replication/db-node-rpc/service_properties.py new file mode 100644 index 0000000..683ff58 --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/service_properties.py @@ -0,0 +1,177 @@ +service_properties = { + 'System': { + 'properties': { + 'id': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'System id' + } + }, + 'methods': [ + 'update_network_topology' + ] + }, + 'Network': { + 'properties': { + 'hostname': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Network host name' + }, + 'domain': { + 'type': 'str', + 'default': 'default.localnet', + 'required': True, + 'description': 'Network domain (name)' + }, + 'address_v4': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Network IPv4 address' + }, + 'address_v6': { + 'type': 'str', + 'default': None, + 'required': False, + 'description': 'Network IPv6 address' + } + } + }, + 'NetworkTopology': { + 'properties': { + 'type': { + 'type': 'str', + 'default': 'un-partitioned', + 'required': False, + 'description': 'Network topology type' + } + } + }, + 'NetIPv4': { + 'properties': { + 'subnet': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'IPv4 subnet' + }, + 'netmask': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'IPv4 netmask' + }, + 'netbits': { + 'type': 'int', + 'default': None, + 'required': True, + 'description': 'IPv4 netmask bits' + }, + 'gateway': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'IPv4 gateway address' + }, + 'hostaddress': { + 'type': 'str', + 'default': None, + 'required': False, + 'description': 'IPv4 docker host address' + } + } + }, + 'NetIPv6': { + 'properties': { + } + }, + 'TopologyHost': { + 'properties': { + 'name': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Hostname' + }, + 'ipv4': { + 'type': 'str', + 'default': None, + 'required': False, + 'description': 'IPv4 address' + }, + 'ipv6': { + 'type': 'str', + 'default': None, + 'required': False, + 'description': 'IPv6 address' + } + } + }, + 'Database': { + 'properties': { + 'id': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Database id' + } + }, + 'methods': [ + 'init_db', + 'create_replica_table' + ] + }, + 'Table': { + 'properties': { + 'name': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Table name' + }, + 'add_timestamp_cols': { + 'type': 'bool', + 'default': True, + 'required': False, + 'description': 'Automatically add timestamp columns for insert and update' + }, + 'attach_replication_trigger': { + 'type': 'bool', + 'default': True, + 'required': False, + 'description': 'Automatically attach replication check trigger (currently only update)' + }, + } + }, + 'Column': { + 'properties': { + 'primary_key': { + 'type': 'bool', + 'default': False, + 'required': False, + 'description': 'Column primary key flag' + }, + 'name': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Column id' + }, + 'type': { + 'type': 'str', + 'default': None, + 'required': True, + 'description': 'Column type' + }, + 'default': { + 'type': 'str', + 'default': None, + 'required': False, + 'description': 'Column default value' + } + } + } +} diff --git a/example/01-logical-replication/db-node-rpc/sql_queries.py b/example/01-logical-replication/db-node-rpc/sql_queries.py new file mode 100644 index 0000000..779affb --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/sql_queries.py @@ -0,0 +1,27 @@ +init_db = ''' +ALTER SYSTEM SET wal_level = logical; +ALTER SYSTEM SET max_replication_slots = 10; +ALTER SYSTEM SET max_wal_senders = 10; + +CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'replicator'; +CREATE ROLE testreader WITH LOGIN PASSWORD 'testreader'; +CREATE ROLE testwriter WITH LOGIN PASSWORD 'testwriter'; +''' + +create_table = ''' +CREATE TABLE %(table_name)s ( + {table_columns} +); + +GRANT SELECT ON TABLE %(table_name)s TO replicator; +GRANT SELECT ON TABLE %(table_name)s TO testreader; +GRANT INSERT ON TABLE %(table_name)s TO testwriter; +''' + +create_publication = ''' +CREATE PUBLICATION %(publication_id)s FOR TABLE %(table_name)s; +''' + +create_subscription = ''' +CREATE SUBSCRIPTION %(subscription_id)s CONNECTION 'host=%(host_ip)s dbname=postgres port=5432' PUBLICATION %(publication_id)s WITH (copy_data = false, origin = none); +''' diff --git a/example/01-logical-replication/db-node-rpc/start-server.sh b/example/01-logical-replication/db-node-rpc/start-server.sh new file mode 100755 index 0000000..3ebf1fa --- /dev/null +++ b/example/01-logical-replication/db-node-rpc/start-server.sh @@ -0,0 +1,2 @@ +#!/bin/sh +cd /json-rpc-server && python3 ./json-rpc-server.py & diff --git a/example/01-logical-replication/db-node.dockerfile b/example/01-logical-replication/db-node.dockerfile new file mode 100644 index 0000000..565c696 --- /dev/null +++ b/example/01-logical-replication/db-node.dockerfile @@ -0,0 +1,21 @@ +FROM postgres:18-bookworm +MAINTAINER Claus Prüfer + +RUN apt-get -qq update -y +RUN apt-get -qq install iproute2 iputils-ping net-tools python3-pip python3-psycopg2 -y + +COPY ./packages/jsocket-1.9.5.tar.gz / + +RUN pip3 install microesb --break-system-packages +RUN pip3 install ./jsocket-1.9.5.tar.gz --break-system-packages + +RUN mkdir /json-rpc-server +COPY ./db-node-rpc/*.py /json-rpc-server/ +COPY ./db-node-rpc/*.sh /json-rpc-server/ + +ENV POSTGRES_USER postgres +ENV POSTGRES_PASSWORD password +ENV POSTGRES_DB lb-test + +EXPOSE 5432 +EXPOSE 64000 diff --git a/example/01-logical-replication/docker-build.sh b/example/01-logical-replication/docker-build.sh new file mode 100755 index 0000000..6ce5713 --- /dev/null +++ b/example/01-logical-replication/docker-build.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +# build docker database node +docker build -t db-node --file ./db-node.dockerfile . diff --git a/example/01-logical-replication/docker-daemon/daemon.json b/example/01-logical-replication/docker-daemon/daemon.json new file mode 100644 index 0000000..326fe13 --- /dev/null +++ b/example/01-logical-replication/docker-daemon/daemon.json @@ -0,0 +1,3 @@ +{ + "iptables": false +} diff --git a/example/01-logical-replication/docker-network.sh b/example/01-logical-replication/docker-network.sh new file mode 100755 index 0000000..1c98f93 --- /dev/null +++ b/example/01-logical-replication/docker-network.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +docker network create --subnet=172.16.1.0/24 --gateway=172.16.1.254 -o com.docker.network.bridge.enable_ip_masquerade=false -o com.docker.network.bridge.name=dbr0 dbpool-net diff --git a/example/01-logical-replication/orchestrator.py b/example/01-logical-replication/orchestrator.py new file mode 100644 index 0000000..9dffa83 --- /dev/null +++ b/example/01-logical-replication/orchestrator.py @@ -0,0 +1,85 @@ +import json +import ipcalc +import jsocket +import subprocess + +import svc_call_metadata + + +def mm_connect(dst_address, dst_port=64000): + client = jsocket.JsonClient(address=dst_address, port=dst_port) + assert client.connect() is True + return client + +def mm_send(client_ref, payload): + client_ref.send_obj(payload) + return client_ref.read_obj() + +def mm_close(client_ref): + client_ref.close() + + +# load configuration +with open('./sysconfig.json', 'r') as fh: + sysconfig = json.loads(fh.read()) + +# model config parts +network = sysconfig['system']['networks'][0] + +network_id = network['id'] +network_config = network['config'] +network_config_scale = network['config']['scale'] + +network_segment = '{}/{}'.format( + network_config['net']['ipv4']['subnet'], + network_config['net']['ipv4']['netbits'] +) + +# make network segment iterator +network_ipv4_addresses = iter(ipcalc.Network(network_segment)) + +svc_system = svc_call_metadata.update_net_topology['data'][0]['System'] + +svc_net = svc_system['Network'] +svc_net_topology = svc_system['NetworkTopology'] +svc_net_topology['NetIPv4'] = network_config['net']['ipv4'] + +# get node-count from config +count_nodes = network_config_scale['max-nodes'] + +# start containers +for i in range(0, count_nodes): + + node_id = 'node-'+str(i) + node_ip = next(network_ipv4_addresses) + #node_ip = '192.168.10.120' + + node_cfg = { + 'name': node_id, + 'ipv4': str(node_ip) + } + + svc_net['hostname'] = node_id + svc_net['domain'] = network_config['net']['domain'] + svc_net['address_v4'] = str(node_ip) + + svc_net_topology['TopologyHost'].append(node_cfg) + + cmd_run_container = [] + cmd_run_container.append('./run-container.sh') + cmd_run_container.append(node_id) + cmd_run_container.append(str(node_ip)) + cmd_run_container.append(network['id']) + + subprocess.run(cmd_run_container, capture_output=True, check=True) + + cmd_start_server = 'docker exec {} /json-rpc-server/start-server.sh'.format(node_id) + res = subprocess.run(cmd_start_server, shell=True, capture_output=True, check=True) + + +for node in svc_net_topology['TopologyHost']: + + print(svc_call_metadata.update_net_topology) + client = mm_connect(node['ipv4']) + res = mm_send(client, svc_call_metadata.update_net_topology) + print(res) diff --git a/example/01-logical-replication/run-container.sh b/example/01-logical-replication/run-container.sh new file mode 100755 index 0000000..d380b94 --- /dev/null +++ b/example/01-logical-replication/run-container.sh @@ -0,0 +1,6 @@ +#!/bin/sh +node_id=$1 +node_ip=$2 +node_net=$3 + +docker run --rm -d --name ${node_id} --ip ${node_ip} --net ${node_net} db-node diff --git a/example/01-logical-replication/svc_call_metadata.py b/example/01-logical-replication/svc_call_metadata.py new file mode 100644 index 0000000..742e023 --- /dev/null +++ b/example/01-logical-replication/svc_call_metadata.py @@ -0,0 +1,61 @@ +update_net_topology = { + 'SYSServiceID': 'UpdateNetworkTopology', + 'data': [ + { + 'SYSBackendMethod': { 'System': 'update_network_topology' }, + 'System': { + 'id': 'db-loadbalancing-test', + 'Network': {}, + 'NetworkTopology': { + 'NetIPv4': {}, + 'TopologyHost': [] + } + } + } + ] +} + +set_global_db_properties = { + 'SYSServiceID': 'InitDatabase', + 'data': [ + { + 'SYSBackendMethod': { 'Database': 'init_db' }, + 'Database': { + 'id': 'lb-test' + } + } + ] +} + +create_repl_table = { + 'SYSServiceID': 'CreateReplicaTable', + 'data': [ + { + 'SYSBackendMethod': { 'Database': 'create_replica_table' }, + 'Database': { + 'id': 'lb-test', + 'Table': { + 'name': 'table1', + 'add_timestamp_cols': True, + 'attach_replication_trigger': True, + 'Column': [ + { + 'name': 'id', + 'type': 'serial', + 'primary_key': True + }, + { + 'name': 'col1', + 'type': 'varchar', + 'default': 'default-value' + }, + { + 'name': 'col2', + 'type': 'varchar' + } + ] + } + } + } + ] +} diff --git a/example/01-logical-replication/sysconfig.json b/example/01-logical-replication/sysconfig.json new file mode 100644 index 0000000..4e883f9 --- /dev/null +++ b/example/01-logical-replication/sysconfig.json @@ -0,0 +1,65 @@ +{ + "system": { + "config": { + "network": { + "failover_mode": false, + "run_on_host": true, + "managers": [ "172.16.1.254" ] + }, + "roles": { + "SYSOwner": { + "id": "admin", + "password": "admin" + }, + "SYSPgDBReplication": { + "id": "replicator", + "password": "replicator" + } + } + }, + "networks": [ + { + "id": "dbpool-net", + "config": { + "net": { + "ipv4": { + "subnet": "172.16.1.0", + "netmask": "255.255.255.0", + "netbits": 24, + "gateway": "172.16.1.254", + "hostaddress": "172.16.1.254" + }, + "domain": ".localnet", + "domain_prepend_netid": true, + "parts": [ + { + "id": "dbpool-net-1", + "ipv4": { + "start": 1, + "end": 253 + } + } + ], + "parent": null + }, + "permissions": { + "SYSOwner": "admin" + }, + "scale": { + "min-nodes": 2, + "max-nodes": 6, + "metrics": { + "characteristics": "default" + } + } + } + } + ], + "runtime": { + "network": { + "nodes": [ + ] + } + } + } +}