From 1ae14d8872ffd469be8f6c19173284ed8f90d086 Mon Sep 17 00:00:00 2001 From: Nizo Priskorn <68460690+dpriskorn@users.noreply.github.com> Date: Mon, 13 Oct 2025 00:21:37 +0200 Subject: [PATCH 1/2] feat: Add support for merging multiple items and creating redirects by default. --- wikibaseintegrator/wbi_helpers.py | 97 +++++++++++++++++++++++++------ 1 file changed, 79 insertions(+), 18 deletions(-) diff --git a/wikibaseintegrator/wbi_helpers.py b/wikibaseintegrator/wbi_helpers.py index f6741407..1c7e1348 100644 --- a/wikibaseintegrator/wbi_helpers.py +++ b/wikibaseintegrator/wbi_helpers.py @@ -8,7 +8,7 @@ import logging import re from time import sleep -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Union from urllib.parse import urlparse import requests @@ -338,33 +338,94 @@ def edit_entity(data: dict, id: str | None = None, type: str | None = None, base return mediawiki_api_call_helper(data=params, is_bot=is_bot, **kwargs) +def create_redirect(from_id: str, to_id: str, login: _Login | None = None) -> dict[str, Any]: + """ + Create a MediaWiki redirect from `from_id` to `to_id`. + """ + log.info(f"Creating redirect from {from_id} → {to_id}") + + data = { + "action": "wbcreateredirect", + "from": from_id, + "to": to_id, + "format": "json" + } -def merge_items(from_id: str, to_id: str, login: _Login | None = None, ignore_conflicts: list[str] | None = None, is_bot: bool = False, **kwargs: Any) -> dict: + try: + response = mediawiki_api_call_helper(data=data, login=login, is_bot=True) + if "error" in response: + log.error(f"Failed to create redirect {from_id} → {to_id}: {response['error']}") + else: + log.info(f"Redirect created successfully: {from_id} → {to_id}") + return response + except Exception as e: + log.error(f"Exception when creating redirect {from_id} → {to_id}: {e}") + return {"error": str(e)} + +def merge_items_and_create_redirect( + qids: list[str], + login: _Login | None = None, + ignore_conflicts: Union[str, list[str]] | None = "description", + is_bot: bool = False, + tags: Union[str, list[str]] | None = None, + **kwargs: Any +) -> str: """ - A static method to merge two items + Merge multiple Wikibase items into the lowest QID. - :param from_id: The ID to merge from. This parameter is required. - :param to_id: The ID to merge to. This parameter is required. + :param qids: List of item QIDs to merge. The lowest QID will be kept. :param login: A wbi_login.Login instance - :param ignore_conflicts: List of elements of the item to ignore conflicts for. Can only contain values of "description", "sitelink" and "statement" - :param is_bot: Mark this edit as bot. + :param ignore_conflicts: List of elements to ignore conflicts for. Can contain "description", "sitelink", "statement". Defaults to "description" for merge to work at all. + :param is_bot: Mark this edit as bot + :param tags: Single tag string or list of tags to attach to the edit + :param kwargs: Additional parameters to pass to mediawiki_api_call_helper + + :return: Final QID after merge. """ + if not qids or len(qids) < 2: + raise ValueError("You must provide at least two QIDs to merge") - params = { - 'action': 'wbmergeitems', - 'fromid': from_id, - 'toid': to_id, - 'format': 'json' - } + # Sort QIDs numerically to keep the lowest + sorted_qids = sorted(qids, key=lambda x: int(x.lstrip('Q'))) + to_id = sorted_qids[0] # keep the lowest QID + from_ids = sorted_qids[1:] # merge all other QIDs into to_id - if ignore_conflicts is not None: - params.update({'ignoreconflicts': '|'.join(ignore_conflicts)}) + # Prepare tags string if provided + tags_str = None + if tags: + if isinstance(tags, list): + tags_str = '|'.join(tags) + else: + tags_str = str(tags) - if is_bot: - params.update({'bot': ''}) + for from_id in from_ids: + params = { + 'action': 'wbmergeitems', + 'fromid': from_id, + 'toid': to_id, + 'format': 'json' + } - return mediawiki_api_call_helper(data=params, login=login, is_bot=is_bot, **kwargs) + if ignore_conflicts is not None: + params['ignoreconflicts'] = '|'.join(ignore_conflicts) + + if is_bot: + params['bot'] = '' + if tags_str: + params['tags'] = tags_str + + try: + mediawiki_api_call_helper(data=params, login=login, is_bot=is_bot, **kwargs) + print(f"Merged {from_id} into {to_id}") + create_redirect(from_id=from_id, to_id=to_id, login=login) + return to_id + except Exception as e: + print(f"Error merging {from_id} into {to_id}: {e}. " + f"Note: sometimes the real cause does not get propagated because of a bug in Wikibase. " + f"See https://github.com/dpriskorn/DanceDatabase/issues/2") + return "" + return "" def merge_lexemes(source: str, target: str, login: _Login | None = None, summary: str | None = None, is_bot: bool = False, **kwargs: Any) -> dict: """ From d488161d09da6c5b28d22dbcc40704138b265d6b Mon Sep 17 00:00:00 2001 From: Nizo Priskorn <68460690+dpriskorn@users.noreply.github.com> Date: Thu, 12 Mar 2026 17:51:45 +0100 Subject: [PATCH 2/2] Fix: Handle optional hash field in Reference.from_json() - Change json_data['hash'] to json_data.get('hash') to allow new references without hash - Use .get() with defaults for snaks and snaks-order for robustness - Add tests for from_json with/without hash fields --- test/test_references.py | 76 +++++++++++++++++++++++++ wikibaseintegrator/models/references.py | 6 +- 2 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 test/test_references.py diff --git a/test/test_references.py b/test/test_references.py new file mode 100644 index 00000000..90bbd41a --- /dev/null +++ b/test/test_references.py @@ -0,0 +1,76 @@ +"""Tests for Reference.from_json() handling of optional fields.""" + +import pytest +from wikibaseintegrator.models.references import Reference, References + + +def test_reference_from_json_without_hash(): + """Test that Reference.from_json() works without hash field (new references).""" + json_data = { + 'snaks': {}, + 'snaks-order': [] + } + reference = Reference().from_json(json_data) + + assert reference.hash is None + assert len(reference.snaks) == 0 + assert reference.snaks_order == [] + + +def test_reference_from_json_with_hash(): + """Test that Reference.from_json() works with hash field (Wikidata format).""" + json_data = { + 'hash': 'abc123def456', + 'snaks': {}, + 'snaks-order': [] + } + reference = Reference().from_json(json_data) + + assert reference.hash == 'abc123def456' + assert len(reference.snaks) == 0 + assert reference.snaks_order == [] + + +def test_references_from_json_without_hash(): + """Test that References.from_json() works with list of references without hash.""" + json_data = [ + {'snaks': {}, 'snaks-order': []}, + {'snaks': {}, 'snaks-order': []} + ] + references = References().from_json(json_data) + + assert len(references) == 2 + # Iterate to access individual references + ref_list = list(references.references) + assert ref_list[0].hash is None + assert ref_list[1].hash is None + + +def test_references_from_json_with_hash(): + """Test that References.from_json() works with list of references with hash.""" + json_data = [ + {'hash': 'hash1', 'snaks': {}, 'snaks-order': []}, + {'hash': 'hash2', 'snaks': {}, 'snaks-order': []} + ] + references = References().from_json(json_data) + + assert len(references) == 2 + ref_list = list(references.references) + assert ref_list[0].hash == 'hash1' + assert ref_list[1].hash == 'hash2' + + +def test_reference_from_json_mixed_hash(): + """Test that References.from_json() handles mix of with/without hash.""" + json_data = [ + {'hash': 'hash1', 'snaks': {}, 'snaks-order': []}, + {'snaks': {}, 'snaks-order': []}, + {'hash': 'hash3', 'snaks': {}, 'snaks-order': []} + ] + references = References().from_json(json_data) + + assert len(references) == 3 + ref_list = list(references.references) + assert ref_list[0].hash == 'hash1' + assert ref_list[1].hash is None + assert ref_list[2].hash == 'hash3' diff --git a/wikibaseintegrator/models/references.py b/wikibaseintegrator/models/references.py index a566fb84..3a241d55 100644 --- a/wikibaseintegrator/models/references.py +++ b/wikibaseintegrator/models/references.py @@ -133,9 +133,9 @@ def add(self, snak: Snak | Claim | None = None, action_if_exists: ActionIfExists return self def from_json(self, json_data: dict[str, Any]) -> Reference: - self.hash = json_data['hash'] - self.snaks = Snaks().from_json(json_data['snaks']) - self.snaks_order = json_data['snaks-order'] + self.hash = json_data.get('hash') + self.snaks = Snaks().from_json(json_data.get('snaks', {})) + self.snaks_order = json_data.get('snaks-order', []) return self