# -*- coding: utf-8 -*-
# Author: Dylan Jones
# Date: 2023-12-13
import logging
import xml.etree.cElementTree as xml
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, IntEnum
from typing import Any, Dict, List, Tuple, Union
from dateutil.relativedelta import relativedelta # noqa
from sqlalchemy import and_, not_, or_
from sqlalchemy.sql.elements import ColumnElement
from .tables import DjmdContent
logger = logging.getLogger(__name__)
__all__ = [
"LogicalOperator",
"Property",
"Operator",
"Condition",
"SmartList",
]
[docs]
class LogicalOperator(IntEnum):
ALL = 1
ANY = 2
[docs]
class Operator(IntEnum):
EQUAL = 1
NOT_EQUAL = 2
GREATER = 3
LESS = 4
IN_RANGE = 5
IN_LAST = 6
NOT_IN_LAST = 7
CONTAINS = 8
NOT_CONTAINS = 9
STARTS_WITH = 10
ENDS_WITH = 11
[docs]
class Property(str, Enum):
ARTIST = "artist"
ALBUM = "album"
ALBUM_ARTIST = "albumArtist"
ORIGINAL_ARTIST = "originalArtist"
BPM = "bpm"
GROUPING = "grouping"
COMMENTS = "comments"
PRODUCER = "producer"
STOCK_DATE = "stockDate"
DATE_CREATED = "dateCreated"
COUNTER = "counter"
FILENAME = "fileName"
GENRE = "genre"
KEY = "key"
LABEL = "label"
MIX_NAME = "mixName"
MYTAG = "myTag"
RATING = "rating"
DATE_RELEASED = "dateReleased"
REMIXED_BY = "remixedBy"
DURATION = "duration"
NAME = "name"
YEAR = "year"
_STR_OPS = [
Operator.EQUAL,
Operator.NOT_EQUAL,
Operator.CONTAINS,
Operator.NOT_CONTAINS,
Operator.STARTS_WITH,
Operator.ENDS_WITH,
]
_NUM_OPS = [
Operator.EQUAL,
Operator.NOT_EQUAL,
Operator.GREATER,
Operator.LESS,
Operator.IN_RANGE,
]
_DATE_OPS = [
Operator.EQUAL,
Operator.NOT_EQUAL,
Operator.GREATER,
Operator.LESS,
Operator.IN_RANGE,
Operator.IN_LAST,
Operator.NOT_IN_LAST,
]
# Defines the valid operators for each property
VALID_OPS: Dict[str, Any] = {
Property.ARTIST: _STR_OPS,
Property.ALBUM: _STR_OPS,
Property.ALBUM_ARTIST: _STR_OPS,
Property.ORIGINAL_ARTIST: _STR_OPS,
Property.BPM: _NUM_OPS,
Property.GROUPING: [Operator.EQUAL, Operator.NOT_EQUAL],
Property.COMMENTS: _STR_OPS,
Property.PRODUCER: _STR_OPS,
Property.STOCK_DATE: _DATE_OPS,
Property.DATE_CREATED: _DATE_OPS,
Property.COUNTER: _NUM_OPS,
Property.FILENAME: _STR_OPS,
Property.GENRE: _STR_OPS,
Property.KEY: _STR_OPS,
Property.LABEL: _STR_OPS,
Property.MIX_NAME: _STR_OPS,
Property.MYTAG: [Operator.CONTAINS, Operator.NOT_CONTAINS],
Property.RATING: _NUM_OPS,
Property.DATE_RELEASED: _DATE_OPS,
Property.REMIXED_BY: _STR_OPS,
Property.DURATION: _NUM_OPS,
Property.NAME: _STR_OPS,
Property.YEAR: _NUM_OPS,
}
# Defines the column names in the DB for properties that are directly mapped
PROPERTY_COLUMN_MAP: Dict[str, str] = {
Property.ARTIST: "ArtistName",
Property.ALBUM: "AlbumName",
Property.ALBUM_ARTIST: "AlbumArtistName",
Property.ORIGINAL_ARTIST: "OrgArtistName",
Property.BPM: "BPM",
Property.GROUPING: "ColorID",
Property.COMMENTS: "Commnt",
Property.PRODUCER: "ComposerName",
Property.STOCK_DATE: "StockDate",
Property.DATE_CREATED: "created_at",
Property.COUNTER: "DJPlayCount",
Property.FILENAME: "FileNameL",
Property.GENRE: "GenreName",
Property.KEY: "KeyName",
Property.LABEL: "LabelName",
# Property.MIX_NAME don't know what this maps to
Property.MYTAG: "MyTagIDs",
Property.RATING: "Rating",
Property.DATE_RELEASED: "ReleaseDate",
Property.REMIXED_BY: "RemixerName",
Property.DURATION: "Length",
Property.NAME: "Title",
Property.YEAR: "ReleaseYear",
}
TYPE_CONVERSION: Dict[str, Any] = {
Property.BPM: int,
Property.STOCK_DATE: lambda x: datetime.strptime(x, "%Y-%m-%d"),
Property.DATE_CREATED: lambda x: datetime.strptime(x, "%Y-%m-%d"),
Property.COUNTER: int,
Property.RATING: int,
Property.DATE_RELEASED: lambda x: datetime.strptime(x, "%Y-%m-%d"),
Property.DURATION: int,
Property.YEAR: int,
}
PROPERTIES = [str(p.value) for p in list(Property)] # noqa
[docs]
@dataclass
class Condition:
"""Dataclass for a smart playlist condition."""
property: str
operator: int
unit: str
value_left: Union[str, int]
value_right: Union[str, int]
def __post_init__(self) -> None:
if self.property not in PROPERTIES:
raise ValueError(
f"Invalid property: '{self.property}'! Supported properties: {PROPERTIES}"
)
valid_ops = VALID_OPS[self.property]
if self.operator not in valid_ops:
raise ValueError(
f"Invalid operator '{self.operator}' for '{self.property}', "
f"must be one of {valid_ops}"
)
if self.operator == Operator.IN_RANGE:
if not self.value_right:
raise ValueError(f"Operator '{self.operator}' requires `value_right`")
def left_bitshift(x: int, nbit: int = 32) -> int:
"""Left shifts an N bit integer with sign change."""
return int(x - 2**nbit)
def right_bitshift(x: int, nbit: int = 32) -> int:
"""Right shifts an N bit integer with sign change."""
return int(x + 2**nbit)
def _get_condition_values(cond: Condition) -> Tuple[Any, Any]:
val_left = cond.value_left
val_right = cond.value_right
func = None
if cond.operator in (Operator.IN_LAST, Operator.NOT_IN_LAST):
func = int
elif cond.property in TYPE_CONVERSION:
func = TYPE_CONVERSION[cond.property]
if func is not None:
if val_left != "":
val_left = func(val_left)
if val_right != "":
try:
val_right = func(val_right)
except ValueError:
pass
if val_left == "":
val_left = None # type: ignore
return val_left, val_right
[docs]
class SmartList:
"""Rekordbox smart playlist XML handler."""
def __init__(self, logical_operator: int = LogicalOperator.ALL, auto_update: int = 0):
self.playlist_id: Union[int, str] = ""
self.logical_operator: int = int(logical_operator)
self.auto_update: int = auto_update
self.conditions: List[Condition] = list()
[docs]
def parse(self, source: str) -> None:
"""Parse the XML source of a smart playlist."""
tree = xml.ElementTree(xml.fromstring(source))
root = tree.getroot()
conditions = list()
for child in root.findall("CONDITION"):
condition = Condition(
property=child.attrib["PropertyName"],
operator=int(child.attrib["Operator"]),
unit=child.attrib["ValueUnit"],
value_left=child.attrib["ValueLeft"],
value_right=child.attrib["ValueRight"],
)
conditions.append(condition)
self.playlist_id = str(right_bitshift(int(root.attrib["Id"])))
self.logical_operator = int(root.attrib["LogicalOperator"])
self.auto_update = int(root.attrib["AutomaticUpdate"])
self.conditions = conditions
[docs]
def to_xml(self) -> str:
"""Convert the smart playlist conditions to XML."""
attrib = {
"Id": str(left_bitshift(int(self.playlist_id))),
"LogicalOperator": str(self.logical_operator),
"AutomaticUpdate": str(self.auto_update),
}
root = xml.Element("NODE", attrib=attrib)
for cond in self.conditions:
attrib = {
"PropertyName": str(cond.property),
"Operator": str(cond.operator),
"ValueUnit": str(cond.unit),
"ValueLeft": str(cond.value_left),
"ValueRight": str(cond.value_right),
}
xml.SubElement(root, "CONDITION", attrib=attrib)
return xml.tostring(root).decode("utf-8").replace(" /", "/")
[docs]
def add_condition(
self,
prop: str,
operator: int,
value_left: str,
value_right: str = "",
unit: str = "",
) -> None:
"""Add a condition to the smart playlist.
Parameters
----------
prop : str
The property to filter on.
operator : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} int
The operator to use. Must be in the range of 1-11
value_left : str
The left value to use.
value_right : str, optional
The right value to use, by default "".
unit : str, optional
The unit to use, by default "".
"""
if isinstance(prop, Property):
prop = str(prop.value)
cond = Condition(prop, int(operator), unit, value_left, value_right)
self.conditions.append(cond)
[docs]
def filter_clause(self) -> ColumnElement[bool]:
"""Return a SQLAlchemy filter clause matching the content of the smart playlist.
Returns
-------
ColumnElement[bool]
A filter list macthing the contents of the smart playlist.
"""
logical_op = and_ if self.logical_operator == LogicalOperator.ALL else or_
comps = list()
for cond in self.conditions:
val_left, val_right = _get_condition_values(cond)
# val_left = str(-abs(int(val_left))) if val_left is not None else ""
if cond.property in PROPERTY_COLUMN_MAP:
colum_name = PROPERTY_COLUMN_MAP[cond.property]
if cond.property == Property.MYTAG:
if int(val_left) < 0:
val_left = str(right_bitshift(int(val_left)))
if cond.operator == Operator.EQUAL:
comp = getattr(DjmdContent, colum_name) == val_left
elif cond.operator == Operator.NOT_EQUAL:
comp = getattr(DjmdContent, colum_name) != val_left
elif cond.operator == Operator.GREATER:
comp = getattr(DjmdContent, colum_name) > val_left
elif cond.operator == Operator.LESS:
comp = getattr(DjmdContent, colum_name) < val_left
elif cond.operator == Operator.IN_RANGE:
comp = getattr(DjmdContent, colum_name).between(val_left, val_right)
elif cond.operator == Operator.CONTAINS:
comp = getattr(DjmdContent, colum_name).contains(val_left)
elif cond.operator == Operator.NOT_CONTAINS:
comp = not_(getattr(DjmdContent, colum_name).contains(val_left))
elif cond.operator == Operator.STARTS_WITH:
comp = getattr(DjmdContent, colum_name).startswith(val_left)
elif cond.operator == Operator.ENDS_WITH:
comp = getattr(DjmdContent, colum_name).endswith(val_left)
elif cond.operator == Operator.IN_LAST:
now = datetime.now()
if cond.unit == "day":
t0 = now - relativedelta(days=val_left)
comp = getattr(DjmdContent, colum_name) > t0
elif cond.unit == "month":
t0 = now - relativedelta(months=val_left)
comp = getattr(DjmdContent, colum_name).month > t0
else:
raise ValueError(f"Unknown unit '{cond.unit}'")
elif cond.operator == Operator.NOT_IN_LAST:
now = datetime.now()
if cond.unit == "day":
t0 = now - relativedelta(days=val_left)
comp = getattr(DjmdContent, colum_name) < t0
elif cond.unit == "month":
t0 = now - relativedelta(months=val_left)
comp = getattr(DjmdContent, colum_name).month < t0
else:
raise ValueError(f"Unknown unit '{cond.unit}'")
else:
raise ValueError(f"Unknown operator '{cond.operator}'")
comps.append(comp)
else:
logger.warning(f"Unsupported property '{cond.property}'")
return logical_op(*comps)