Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/api/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,74 @@ flowchart TB
end
```

### Multiprocessing example

XML parsing is CPU-bound and scales well across processes. Loading into the
database, however, must be coordinated to avoid conflicts on shared tables.
The right level of synchronisation depends on the backend:

* **DuckDB (file-based)** — only one active writer is allowed at a time, so
all database I/O must be serialised.
* **PostgreSQL, MS SQL Server, …** — concurrent writes to *different* temp
tables are safe (each process gets a unique temp-table prefix), but the final
merge into the shared target tables should be serialised.

The simplest approach — and the one shown below — is to serialise the entire
database phase with a `multiprocessing.Lock`, keeping only the parsing step
parallel. This works correctly for all backends.

```python
import multiprocessing
from xml2db import DataModel


def load_one_file(xml_path, xsd_path, connection_string, lock):
# Each process creates its own DataModel with a unique temp_prefix.
model = DataModel(
xsd_file=xsd_path,
connection_string=connection_string,
)
# XML parsing is CPU-bound and runs in parallel across all processes.
doc = model.parse_xml(xml_path)

# Serialise all database I/O across processes.
with lock:
doc.insert_into_target_tables()
model.engine.dispose()


if __name__ == "__main__":
xsd_path = "schema.xsd"
connection_string = "duckdb:///data.duckdb"
xml_files = ["file1.xml", "file2.xml", "file3.xml"]

lock = multiprocessing.Lock()
processes = [
multiprocessing.Process(
target=load_one_file,
args=(xml_path, xsd_path, connection_string, lock),
)
for xml_path in xml_files
]
for p in processes:
p.start()
for p in processes:
p.join()
if p.exitcode != 0:
raise RuntimeError(f"Worker failed with exit code {p.exitcode}")
```

!!! Note
For backends that support concurrent writers, you can increase throughput
by splitting
[`Document.insert_into_target_tables`](document.md/#xml2db.document.Document.insert_into_target_tables)
into separate calls to
[`Document.insert_into_temp_tables`](document.md/#xml2db.document.Document.insert_into_temp_tables)
(run concurrently — each process has a unique temp-table prefix so there
are no collisions) and
[`Document.merge_into_target_tables`](document.md/#xml2db.document.Document.merge_into_target_tables)
(serialised via lock).

## *Advanced use:* get data from the database back to XML

The flow chart below presents data conversions used to get back data from the database into XML, showing the functions
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "xml2db"
version = "0.13.0"
version = "0.13.1"
authors = [
{ name="Commission de régulation de l'énergie", email="opensource@cre.fr" },
]
Expand Down
114 changes: 114 additions & 0 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Tests for concurrent XML loading with multiprocessing and a file-based DuckDB."""
import multiprocessing
import os
import tempfile

import pytest
from lxml import etree

pytest.importorskip("duckdb", reason="duckdb not installed")

from sqlalchemy import String, create_engine, text

from xml2db import DataModel

_SAMPLE = os.path.join(os.path.dirname(__file__), "sample_models", "orders")
_XSD = os.path.join(_SAMPLE, "orders.xsd")
_XML_FILES = [
os.path.join(_SAMPLE, "xml", f"order{i}.xml") for i in (1, 2, 3)
]

# Matches orders model version 0 in sample_models/models.py so that the XML
# roundtrip produces byte-for-byte identical output.
_MODEL_CONFIG = {
"tables": {
"shiporder": {"fields": {"orderperson": {"transform": False}}},
"item": None,
},
"record_hash_column_name": "record_hash",
"metadata_columns": [
{"name": "input_file_path", "type": String(256)},
],
}


def _load_xml_file(xml_path: str, xsd_path: str, db_path: str, lock) -> None:
"""Worker function: parse one XML file and load it into a shared DuckDB file.

Each process builds its own DataModel (and gets a unique temp_prefix UUID),
so temporary tables never collide. All database I/O is serialised via *lock*
because DuckDB allows only one active writer at a time.
"""
model = DataModel(
xsd_file=xsd_path,
connection_string=f"duckdb:///{db_path}",
model_config=_MODEL_CONFIG,
)
# CPU-bound XML parsing runs in parallel across processes.
doc = model.parse_xml(xml_path, metadata={"input_file_path": xml_path})

# Serialise all database access: one writer at a time for DuckDB.
with lock:
doc.insert_into_target_tables()
# Dispose inside the lock so the file handle is released before
# the next process tries to open the database.
model.engine.dispose()


def test_multiprocessing_file_duckdb():
"""Three worker processes load XML files concurrently into a file-based DuckDB.

Parsing happens in parallel; database writes are serialised via a
multiprocessing.Lock. After all workers finish:
- the target table must contain one row per XML file, and
- each file must round-trip back to identical XML (content assertion).
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.duckdb")
lock = multiprocessing.Lock()

processes = [
multiprocessing.Process(
target=_load_xml_file,
args=(xml_path, _XSD, db_path, lock),
)
for xml_path in _XML_FILES
]
for p in processes:
p.start()
for p in processes:
p.join()
assert p.exitcode == 0, (
f"Worker for {_XML_FILES[processes.index(p)]} "
f"exited with code {p.exitcode}"
)

# --- row count ---
engine = create_engine(f"duckdb:///{db_path}")
with engine.connect() as conn:
count = conn.execute(text("SELECT COUNT(*) FROM orders")).scalar()
engine.dispose()
assert count == len(_XML_FILES)

# --- content roundtrip ---
verify_model = DataModel(
xsd_file=_XSD,
connection_string=f"duckdb:///{db_path}",
model_config=_MODEL_CONFIG,
)
for xml_path in _XML_FILES:
doc = verify_model.extract_from_database(
f"input_file_path='{xml_path}'",
force_tz="Europe/Paris",
)
src = etree.parse(xml_path).getroot()
el = doc.to_xml(nsmap=src.nsmap)
for key, val in src.attrib.items():
el.set(key, val)
actual = etree.tostring(
el, pretty_print=True, encoding="utf-8", xml_declaration=True
).decode("utf-8")
with open(xml_path) as f:
expected = f.read()
assert actual == expected, f"XML roundtrip failed for {xml_path}"
verify_model.engine.dispose()
Loading