# -*- coding: utf-8 -*-
# Author: Dylan Jones
# Date: 2023-02-01
import logging
from collections import abc
from pathlib import Path
from typing import Any, Iterator, List, Union
from construct import Int16ub, Struct
from . import structs
from .tags import TAGS, AbstractAnlzTag, StructNotInitializedError
logger = logging.getLogger(__name__)
XOR_MASK = bytearray.fromhex("CB E1 EE FA E5 EE AD EE E9 D2 E9 EB E1 E9 F3 E8 E9 F4 E1")
[docs]
class BuildFileLengthError(Exception):
def __init__(self, struct: Struct, len_data: int) -> None:
super().__init__(
f"`len_file` ({struct.len_file}) of '{struct.type}' does not "
f"match the data-length ({len_data})!"
)
[docs]
class AnlzFile(abc.Mapping): # type: ignore[type-arg]
"""Rekordbox `ANLZnnnn.xxx` binary file handler."""
def __init__(self) -> None:
self._path: str = ""
self.file_header: Union[Struct, None] = None
self.tags: List[AbstractAnlzTag] = list()
@property
def num_tags(self) -> int:
return len(self.tags)
@property
def tag_types(self) -> List[str]:
return [tag.type for tag in self.tags]
@property
def path(self) -> str:
return self._path
[docs]
@classmethod
def parse(cls, data: bytes) -> "AnlzFile":
"""Parses the in-memory data of a Rekordbox analysis binary file.
Parameters
----------
data : bytes
The in-memory binary contents of a Rekordbox analysis file.
Returns
-------
self : AnlzFile
The new instance with the parsed file content.
"""
self = cls()
self._parse(data)
return self
[docs]
@classmethod
def parse_file(cls, path: Union[str, Path]) -> "AnlzFile":
"""Reads and parses a Rekordbox analysis binary file.
Parameters
----------
path : str or Path
The path of a Rekordbox analysis file which is used to read
the file contents before parsing the binary data.
Returns
-------
self : AnlzFile
The new instance with the parsed file content.
See Also
--------
AnlzFile.parse: Parses the data of a Rekordbox analysis file.
"""
path = Path(path)
ext = path.suffix.upper()
if ext not in (".DAT", ".EXT", ".2EX"):
raise ValueError(f"File type '{ext}' not supported!")
logger.debug(f"Reading file {path.name}")
with open(path, "rb") as fh:
data = fh.read()
self = cls.parse(data)
self._path = str(path)
return self
def _parse(self, data: bytes) -> None:
file_header = structs.AnlzFileHeader.parse(data)
tag_type = file_header.type
assert tag_type == "PMAI"
tags = list()
i = file_header.len_header
while i < file_header.len_file:
# Get data starting from struct
tag_data = data[i:]
# Get the four byte struct type
tag_type = tag_data[:4].decode("ascii")
# Get tag length from generic tag header
tag_struct = structs.AnlzTag.parse(tag_data)
len_tag = tag_struct.len_tag
if tag_type == "PSSI":
# The version that rekordbox 6 *exports* is garbled with an XOR mask.
# Determine if the tag is garbled by checking the initial (masked) mood value.
# All bytes after byte 17 (len_e) are XOR-masked with a pattern that is
# generated by adding the value of len_e to each byte of XOR_MASK
# Check if the file is garbled (only on exported files)
# For this we check the validity of mood and bank
# Mood: High=1, Mid=2, Low=3
mood = Int16ub.parse(tag_data[18:20])
if 1 <= mood <= 3:
logger.debug("PSSI is not garbled!")
else:
logger.debug("PSSI is garbled! (raw_mood=%s)", mood)
len_entries = Int16ub.parse(tag_data[16:18])
# Copy only this tag's data so we don't mutate the remainder of file slice
mutable_tag_data = bytearray(tag_data[:len_tag])
for x in range(len(mutable_tag_data) - 18):
mask = XOR_MASK[x % len(XOR_MASK)] + len_entries
if mask > 255:
mask -= 256
mutable_tag_data[18 + x] ^= mask
tag_data = bytes(mutable_tag_data)
try:
# Parse the struct
tag = TAGS[tag_type](tag_data)
if tag.struct is None:
raise StructNotInitializedError()
tags.append(tag)
len_header = tag.struct.len_header
logger.debug(
"Parsed struct '%s' (len_header=%s, len_tag=%s)",
tag_type,
len_header,
len_tag,
)
except KeyError:
logger.warning("Tag '%s' not supported!", tag_type)
i += len_tag
self.file_header = file_header
self.tags = tags
[docs]
def update_len(self) -> None:
# Update struct lengths
if self.file_header is None:
raise StructNotInitializedError()
tags_len = 0
for tag in self.tags:
if tag.struct is None:
raise StructNotInitializedError()
tag.update_len()
tags_len += tag.struct.len_tag
# Update file length
len_file = self.file_header.len_header + tags_len
self.file_header.len_file = len_file
[docs]
def build(self) -> bytes:
if self.file_header is None:
raise StructNotInitializedError()
self.update_len()
header_data = structs.AnlzFileHeader.build(self.file_header)
section_data = b"".join(tag.build() for tag in self.tags)
data: bytes = header_data + section_data
# Check `len_file`
len_file = self.file_header.len_file
len_data = len(data)
if len_file != len_data:
raise BuildFileLengthError(self.file_header, len_file)
return data
[docs]
def save(self, path: Union[str, Path] = "") -> None:
path = path or self._path
data = self.build()
with open(path, "wb") as fh:
fh.write(data)
[docs]
def get_tag(self, key: str) -> AbstractAnlzTag:
return self.__getitem__(key)[0]
[docs]
def get(self, key: str) -> Any: # type: ignore[override]
return self.__getitem__(key)[0].get()
[docs]
def getall(self, key: str) -> List[Any]:
return [tag.get() for tag in self.__getitem__(key)]
def __len__(self) -> int:
return len(self.keys())
def __iter__(self) -> Iterator[str]:
return iter(set(tag.type for tag in self.tags))
def __getitem__(self, item: str) -> List[AbstractAnlzTag]:
if item.isupper() and len(item) == 4:
return [tag for tag in self.tags if tag.type == item]
else:
return [tag for tag in self.tags if tag.name == item]
def __contains__(self, item: str) -> bool: # type: ignore[override]
if item.isupper() and len(item) == 4:
for tag in self.tags:
if item == tag.type:
return True
else:
for tag in self.tags:
if item == tag.name:
return True
return False
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.tag_types})"
[docs]
def set_path(self, path: Union[Path, str]) -> None:
tag = self.get_tag("PPTH")
tag.set(path)