From ca454c299eb5b94e1ada9d44b718aff2a5bd5841 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 08:20:09 +0000 Subject: [PATCH 1/4] Add multiprocessing pattern: test and docs for concurrent XML loading Documents and tests the pattern of using a multiprocessing.Lock to allow parallel XML parsing while serialising DuckDB writes to a shared file-based database. Each worker process gets its own DataModel (unique temp_prefix UUID), so temp tables never collide. engine.dispose() is called inside the lock to release the file handle before the next writer acquires it. https://claude.ai/code/session_01JB3CkHnBDM3aQP2EHhfzjv --- docs/api/overview.md | 65 ++++++++++++++++++++++++++++++ tests/test_multiprocessing.py | 76 +++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 tests/test_multiprocessing.py diff --git a/docs/api/overview.md b/docs/api/overview.md index 75de4d5..17d6f98 100644 --- a/docs/api/overview.md +++ b/docs/api/overview.md @@ -53,6 +53,71 @@ flowchart TB end ``` +### Multiprocessing example + +XML parsing is CPU-bound and scales well across processes. Loading into the +database, however, must be coordinated: DuckDB allows only **one active writer +at a time** when multiple processes connect to the same database file, so all +database I/O should be serialised. + +The pattern below keeps parsing parallel while serialising database access with +a `multiprocessing.Lock`. Because every `DataModel` instance generates its own +UUID-based `temp_prefix`, temporary tables are uniquely named per process and +never collide with each other. + +```python +import multiprocessing +from xml2db import DataModel + + +def load_one_file(xml_path, xsd_path, db_path, lock): + # Each process creates its own DataModel with a unique temp_prefix. + model = DataModel( + xsd_file=xsd_path, + connection_string=f"duckdb:///{db_path}", + ) + # XML parsing is CPU-bound and runs in parallel across all processes. + doc = model.parse_xml(xml_path) + + # Serialise all database I/O: DuckDB supports one writer at a time. + with lock: + doc.insert_into_target_tables() + # Dispose inside the lock so the file handle is released before + # the next process opens the database. + model.engine.dispose() + + +if __name__ == "__main__": + xsd_path = "schema.xsd" + db_path = "data.duckdb" + xml_files = ["file1.xml", "file2.xml", "file3.xml"] + + lock = multiprocessing.Lock() + processes = [ + multiprocessing.Process( + target=load_one_file, + args=(xml_path, xsd_path, db_path, lock), + ) + for xml_path in xml_files + ] + for p in processes: + p.start() + for p in processes: + p.join() + if p.exitcode != 0: + raise RuntimeError(f"Worker failed with exit code {p.exitcode}") +``` + +!!! Note + For databases that support concurrent writers (PostgreSQL, MS SQL Server), + only the merge step needs to be serialised. You can split the default + [`Document.insert_into_target_tables`](document.md/#xml2db.document.Document.insert_into_target_tables) + into separate calls to + [`Document.insert_into_temp_tables`](document.md/#xml2db.document.Document.insert_into_temp_tables) + (concurrent, safe because each process has a unique temp-table prefix) and + [`Document.merge_into_target_tables`](document.md/#xml2db.document.Document.merge_into_target_tables) + (serialised via lock). + ## *Advanced use:* get data from the database back to XML The flow chart below presents data conversions used to get back data from the database into XML, showing the functions diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py new file mode 100644 index 0000000..15ffeae --- /dev/null +++ b/tests/test_multiprocessing.py @@ -0,0 +1,76 @@ +"""Tests for concurrent XML loading with multiprocessing and a file-based DuckDB.""" +import multiprocessing +import os +import tempfile + +import pytest + +pytest.importorskip("duckdb", reason="duckdb not installed") + +from sqlalchemy import create_engine, text + +from xml2db import DataModel + +_SAMPLE = os.path.join(os.path.dirname(__file__), "sample_models", "orders") +_XSD = os.path.join(_SAMPLE, "orders.xsd") +_XML_FILES = [ + os.path.join(_SAMPLE, "xml", f"order{i}.xml") for i in (1, 2, 3) +] + + +def _load_xml_file(xml_path: str, xsd_path: str, db_path: str, lock) -> None: + """Worker function: parse one XML file and load it into a shared DuckDB file. + + Each process builds its own DataModel (and gets a unique temp_prefix UUID), + so temporary tables never collide. All database I/O is serialised via *lock* + because DuckDB allows only one active writer at a time. + """ + model = DataModel( + xsd_file=xsd_path, + connection_string=f"duckdb:///{db_path}", + ) + # CPU-bound XML parsing runs in parallel across processes. + doc = model.parse_xml(xml_path) + + # Serialise all database access: one writer at a time for DuckDB. + with lock: + doc.insert_into_target_tables() + # Dispose inside the lock so the file handle is released before + # the next process tries to open the database. + model.engine.dispose() + + +def test_multiprocessing_file_duckdb(): + """Three worker processes load XML files concurrently into a file-based DuckDB. + + Parsing happens in parallel; database writes are serialised via a + multiprocessing.Lock. After all workers finish, the target table must + contain one row per XML file (each file has a distinct batch_id, so no + deduplication occurs). + """ + with tempfile.TemporaryDirectory() as tmpdir: + db_path = os.path.join(tmpdir, "test.duckdb") + lock = multiprocessing.Lock() + + processes = [ + multiprocessing.Process( + target=_load_xml_file, + args=(xml_path, _XSD, db_path, lock), + ) + for xml_path in _XML_FILES + ] + for p in processes: + p.start() + for p in processes: + p.join() + assert p.exitcode == 0, ( + f"Worker for {_XML_FILES[processes.index(p)]} " + f"exited with code {p.exitcode}" + ) + + engine = create_engine(f"duckdb:///{db_path}") + with engine.connect() as conn: + count = conn.execute(text("SELECT COUNT(*) FROM orders")).scalar() + engine.dispose() + + assert count == len(_XML_FILES) From 720d4696f7e9c2a6f5fcf249dde5cfc7d2d447cd Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 08:48:47 +0000 Subject: [PATCH 2/4] Add XML content roundtrip assertion to multiprocessing test After all worker processes finish loading, verify each file by extracting it from the database and comparing the regenerated XML byte-for-byte with the original. Uses the same model config as the orders roundtrip test (version 0) so the assertion is meaningful. https://claude.ai/code/session_01JB3CkHnBDM3aQP2EHhfzjv --- tests/test_multiprocessing.py | 50 ++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index 15ffeae..674de73 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -4,10 +4,11 @@ import tempfile import pytest +from lxml import etree pytest.importorskip("duckdb", reason="duckdb not installed") -from sqlalchemy import create_engine, text +from sqlalchemy import String, create_engine, text from xml2db import DataModel @@ -17,6 +18,19 @@ os.path.join(_SAMPLE, "xml", f"order{i}.xml") for i in (1, 2, 3) ] +# Matches orders model version 0 in sample_models/models.py so that the XML +# roundtrip produces byte-for-byte identical output. +_MODEL_CONFIG = { + "tables": { + "shiporder": {"fields": {"orderperson": {"transform": False}}}, + "item": None, + }, + "record_hash_column_name": "record_hash", + "metadata_columns": [ + {"name": "input_file_path", "type": String(256)}, + ], +} + def _load_xml_file(xml_path: str, xsd_path: str, db_path: str, lock) -> None: """Worker function: parse one XML file and load it into a shared DuckDB file. @@ -28,9 +42,10 @@ def _load_xml_file(xml_path: str, xsd_path: str, db_path: str, lock) -> None: model = DataModel( xsd_file=xsd_path, connection_string=f"duckdb:///{db_path}", + model_config=_MODEL_CONFIG, ) # CPU-bound XML parsing runs in parallel across processes. - doc = model.parse_xml(xml_path) + doc = model.parse_xml(xml_path, metadata={"input_file_path": xml_path}) # Serialise all database access: one writer at a time for DuckDB. with lock: @@ -44,9 +59,9 @@ def test_multiprocessing_file_duckdb(): """Three worker processes load XML files concurrently into a file-based DuckDB. Parsing happens in parallel; database writes are serialised via a - multiprocessing.Lock. After all workers finish, the target table must - contain one row per XML file (each file has a distinct batch_id, so no - deduplication occurs). + multiprocessing.Lock. After all workers finish: + - the target table must contain one row per XML file, and + - each file must round-trip back to identical XML (content assertion). """ with tempfile.TemporaryDirectory() as tmpdir: db_path = os.path.join(tmpdir, "test.duckdb") @@ -68,9 +83,32 @@ def test_multiprocessing_file_duckdb(): f"exited with code {p.exitcode}" ) + # --- row count --- engine = create_engine(f"duckdb:///{db_path}") with engine.connect() as conn: count = conn.execute(text("SELECT COUNT(*) FROM orders")).scalar() engine.dispose() - assert count == len(_XML_FILES) + + # --- content roundtrip --- + verify_model = DataModel( + xsd_file=_XSD, + connection_string=f"duckdb:///{db_path}", + model_config=_MODEL_CONFIG, + ) + for xml_path in _XML_FILES: + doc = verify_model.extract_from_database( + f"input_file_path='{xml_path}'", + force_tz="Europe/Paris", + ) + src = etree.parse(xml_path).getroot() + el = doc.to_xml(nsmap=src.nsmap) + for key, val in src.attrib.items(): + el.set(key, val) + actual = etree.tostring( + el, pretty_print=True, encoding="utf-8", xml_declaration=True + ).decode("utf-8") + with open(xml_path) as f: + expected = f.read() + assert actual == expected, f"XML roundtrip failed for {xml_path}" + verify_model.engine.dispose() From 3d7c871ef28e19fa910c175bc8f65824a5072b94 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 09:44:22 +0000 Subject: [PATCH 3/4] Make multiprocessing docs intro backend-agnostic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Open with the general principle (serialise DB access across processes) and explain why the level of required synchronisation differs per backend, before presenting the DuckDB-as-concrete-example snippet. Also rename db_path → connection_string in the code sample to remove the DuckDB-specific string construction. https://claude.ai/code/session_01JB3CkHnBDM3aQP2EHhfzjv --- docs/api/overview.md | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/docs/api/overview.md b/docs/api/overview.md index 17d6f98..3fb19f2 100644 --- a/docs/api/overview.md +++ b/docs/api/overview.md @@ -56,47 +56,49 @@ flowchart TB ### Multiprocessing example XML parsing is CPU-bound and scales well across processes. Loading into the -database, however, must be coordinated: DuckDB allows only **one active writer -at a time** when multiple processes connect to the same database file, so all -database I/O should be serialised. +database, however, must be coordinated to avoid conflicts on shared tables. +The right level of synchronisation depends on the backend: -The pattern below keeps parsing parallel while serialising database access with -a `multiprocessing.Lock`. Because every `DataModel` instance generates its own -UUID-based `temp_prefix`, temporary tables are uniquely named per process and -never collide with each other. +* **DuckDB (file-based)** — only one active writer is allowed at a time, so + all database I/O must be serialised. +* **PostgreSQL, MS SQL Server, …** — concurrent writes to *different* temp + tables are safe (each process gets a unique temp-table prefix), but the final + merge into the shared target tables should be serialised. + +The simplest approach — and the one shown below — is to serialise the entire +database phase with a `multiprocessing.Lock`, keeping only the parsing step +parallel. This works correctly for all backends. ```python import multiprocessing from xml2db import DataModel -def load_one_file(xml_path, xsd_path, db_path, lock): +def load_one_file(xml_path, xsd_path, connection_string, lock): # Each process creates its own DataModel with a unique temp_prefix. model = DataModel( xsd_file=xsd_path, - connection_string=f"duckdb:///{db_path}", + connection_string=connection_string, ) # XML parsing is CPU-bound and runs in parallel across all processes. doc = model.parse_xml(xml_path) - # Serialise all database I/O: DuckDB supports one writer at a time. + # Serialise all database I/O across processes. with lock: doc.insert_into_target_tables() - # Dispose inside the lock so the file handle is released before - # the next process opens the database. model.engine.dispose() if __name__ == "__main__": xsd_path = "schema.xsd" - db_path = "data.duckdb" + connection_string = "duckdb:///data.duckdb" xml_files = ["file1.xml", "file2.xml", "file3.xml"] lock = multiprocessing.Lock() processes = [ multiprocessing.Process( target=load_one_file, - args=(xml_path, xsd_path, db_path, lock), + args=(xml_path, xsd_path, connection_string, lock), ) for xml_path in xml_files ] @@ -109,12 +111,13 @@ if __name__ == "__main__": ``` !!! Note - For databases that support concurrent writers (PostgreSQL, MS SQL Server), - only the merge step needs to be serialised. You can split the default + For backends that support concurrent writers, you can increase throughput + by splitting [`Document.insert_into_target_tables`](document.md/#xml2db.document.Document.insert_into_target_tables) into separate calls to [`Document.insert_into_temp_tables`](document.md/#xml2db.document.Document.insert_into_temp_tables) - (concurrent, safe because each process has a unique temp-table prefix) and + (run concurrently — each process has a unique temp-table prefix so there + are no collisions) and [`Document.merge_into_target_tables`](document.md/#xml2db.document.Document.merge_into_target_tables) (serialised via lock). From 092e1bd63a594035980b8ea259b48c7f791d42c7 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 09:47:49 +0000 Subject: [PATCH 4/4] Bump version to 0.13.1 https://claude.ai/code/session_01JB3CkHnBDM3aQP2EHhfzjv --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 02b3ce6..e3f1138 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "xml2db" -version = "0.13.0" +version = "0.13.1" authors = [ { name="Commission de régulation de l'énergie", email="opensource@cre.fr" }, ]