Source code for pyrekordbox.anlz.file

# -*- coding: utf-8 -*-
# Author: Dylan Jones
# Date:   2023-02-01

import logging
from collections import abc
from pathlib import Path
from typing import Any, Iterator, List, Union

from construct import Int16ub, Struct

from . import structs
from .tags import TAGS, AbstractAnlzTag, StructNotInitializedError

logger = logging.getLogger(__name__)

XOR_MASK = bytearray.fromhex("CB E1 EE FA E5 EE AD EE E9 D2 E9 EB E1 E9 F3 E8 E9 F4 E1")


[docs] class BuildFileLengthError(Exception): def __init__(self, struct: Struct, len_data: int) -> None: super().__init__( f"`len_file` ({struct.len_file}) of '{struct.type}' does not " f"match the data-length ({len_data})!" )
[docs] class AnlzFile(abc.Mapping): # type: ignore[type-arg] """Rekordbox `ANLZnnnn.xxx` binary file handler.""" def __init__(self) -> None: self._path: str = "" self.file_header: Union[Struct, None] = None self.tags: List[AbstractAnlzTag] = list() @property def num_tags(self) -> int: return len(self.tags) @property def tag_types(self) -> List[str]: return [tag.type for tag in self.tags] @property def path(self) -> str: return self._path
[docs] @classmethod def parse(cls, data: bytes) -> "AnlzFile": """Parses the in-memory data of a Rekordbox analysis binary file. Parameters ---------- data : bytes The in-memory binary contents of a Rekordbox analysis file. Returns ------- self : AnlzFile The new instance with the parsed file content. """ self = cls() self._parse(data) return self
[docs] @classmethod def parse_file(cls, path: Union[str, Path]) -> "AnlzFile": """Reads and parses a Rekordbox analysis binary file. Parameters ---------- path : str or Path The path of a Rekordbox analysis file which is used to read the file contents before parsing the binary data. Returns ------- self : AnlzFile The new instance with the parsed file content. See Also -------- AnlzFile.parse: Parses the data of a Rekordbox analysis file. """ path = Path(path) ext = path.suffix.upper() if ext not in (".DAT", ".EXT", ".2EX"): raise ValueError(f"File type '{ext}' not supported!") logger.debug(f"Reading file {path.name}") with open(path, "rb") as fh: data = fh.read() self = cls.parse(data) self._path = str(path) return self
def _parse(self, data: bytes) -> None: file_header = structs.AnlzFileHeader.parse(data) tag_type = file_header.type assert tag_type == "PMAI" tags = list() i = file_header.len_header while i < file_header.len_file: # Get data starting from struct tag_data = data[i:] # Get the four byte struct type tag_type = tag_data[:4].decode("ascii") # Get tag length from generic tag header tag_struct = structs.AnlzTag.parse(tag_data) len_tag = tag_struct.len_tag if tag_type == "PSSI": # The version that rekordbox 6 *exports* is garbled with an XOR mask. # Determine if the tag is garbled by checking the initial (masked) mood value. # All bytes after byte 17 (len_e) are XOR-masked with a pattern that is # generated by adding the value of len_e to each byte of XOR_MASK # Check if the file is garbled (only on exported files) # For this we check the validity of mood and bank # Mood: High=1, Mid=2, Low=3 mood = Int16ub.parse(tag_data[18:20]) if 1 <= mood <= 3: logger.debug("PSSI is not garbled!") else: logger.debug("PSSI is garbled! (raw_mood=%s)", mood) len_entries = Int16ub.parse(tag_data[16:18]) # Copy only this tag's data so we don't mutate the remainder of file slice mutable_tag_data = bytearray(tag_data[:len_tag]) for x in range(len(mutable_tag_data) - 18): mask = XOR_MASK[x % len(XOR_MASK)] + len_entries if mask > 255: mask -= 256 mutable_tag_data[18 + x] ^= mask tag_data = bytes(mutable_tag_data) try: # Parse the struct tag = TAGS[tag_type](tag_data) if tag.struct is None: raise StructNotInitializedError() tags.append(tag) len_header = tag.struct.len_header logger.debug( "Parsed struct '%s' (len_header=%s, len_tag=%s)", tag_type, len_header, len_tag, ) except KeyError: logger.warning("Tag '%s' not supported!", tag_type) i += len_tag self.file_header = file_header self.tags = tags
[docs] def update_len(self) -> None: # Update struct lengths if self.file_header is None: raise StructNotInitializedError() tags_len = 0 for tag in self.tags: if tag.struct is None: raise StructNotInitializedError() tag.update_len() tags_len += tag.struct.len_tag # Update file length len_file = self.file_header.len_header + tags_len self.file_header.len_file = len_file
[docs] def build(self) -> bytes: if self.file_header is None: raise StructNotInitializedError() self.update_len() header_data = structs.AnlzFileHeader.build(self.file_header) section_data = b"".join(tag.build() for tag in self.tags) data: bytes = header_data + section_data # Check `len_file` len_file = self.file_header.len_file len_data = len(data) if len_file != len_data: raise BuildFileLengthError(self.file_header, len_file) return data
[docs] def save(self, path: Union[str, Path] = "") -> None: path = path or self._path data = self.build() with open(path, "wb") as fh: fh.write(data)
[docs] def get_tag(self, key: str) -> AbstractAnlzTag: return self.__getitem__(key)[0]
[docs] def getall_tags(self, key: str) -> List[AbstractAnlzTag]: return self.__getitem__(key)
[docs] def get(self, key: str) -> Any: # type: ignore[override] return self.__getitem__(key)[0].get()
[docs] def getall(self, key: str) -> List[Any]: return [tag.get() for tag in self.__getitem__(key)]
def __len__(self) -> int: return len(self.keys()) def __iter__(self) -> Iterator[str]: return iter(set(tag.type for tag in self.tags)) def __getitem__(self, item: str) -> List[AbstractAnlzTag]: if item.isupper() and len(item) == 4: return [tag for tag in self.tags if tag.type == item] else: return [tag for tag in self.tags if tag.name == item] def __contains__(self, item: str) -> bool: # type: ignore[override] if item.isupper() and len(item) == 4: for tag in self.tags: if item == tag.type: return True else: for tag in self.tags: if item == tag.name: return True return False def __repr__(self) -> str: return f"{self.__class__.__name__}({self.tag_types})"
[docs] def set_path(self, path: Union[Path, str]) -> None: tag = self.get_tag("PPTH") tag.set(path)