Source code for pyrekordbox.db6.database

# -*- coding: utf-8 -*-
# Author: Dylan Jones
# Date:   2023-08-13

import datetime
import logging
import secrets
from pathlib import Path
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
from uuid import uuid4

from sqlalchemy import MetaData, create_engine, event, or_, select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.sqltypes import DateTime, String

from ..anlz import AnlzFile, get_anlz_paths, read_anlz_files  # type: ignore[attr-defined]
from ..config import get_config
from ..utils import deobfuscate, get_rekordbox_pid
from . import tables
from .aux_files import MasterPlaylistXml
from .registry import RekordboxAgentRegistry
from .smartlist import SmartList
from .tables import DjmdContent, DjmdPlaylist, DjmdSongPlaylist, FileType, PlaylistType

try:
    from sqlcipher3 import dbapi2 as sqlite3  # noqa

    _sqlcipher_available = True
except ImportError:  # pragma: no cover
    import sqlite3  # type: ignore[no-redef]

    _sqlcipher_available = False

SPECIAL_PLAYLIST_IDS = [
    "100000",  # Cloud Library Sync
    "200000",  # CUE Analysis Playlist
]

BLOB = b"PN_Pq^*N>(JYe*u^8;Yg76HuZ<mR13S?=>)b9;DpoTXV(6ItkU`}8*m6tx_I{Solh_N#dfe{v="

logger = logging.getLogger(__name__)

PathLike = Union[str, Path]
ContentLike = Union[DjmdContent, int, str]
PlaylistLike = Union[DjmdPlaylist, int, str]
T = TypeVar("T", bound=tables.Base)


def _parse_query_result(query: Query[T], kwargs: Dict[str, Any]) -> Any:
    if "ID" in kwargs or "registry_id" in kwargs:
        try:
            result: T = query.one()
            return result
        except NoResultFound:
            return None
    return query


[docs] class SessionNotInitializedError(Exception): def __init__(self) -> None: super().__init__("Sqlite-session not intialized!")
[docs] class Rekordbox6Database: """Rekordbox v6 master.db database handler. Parameters ---------- path : str or Path, optional The path of the Rekordbox v6 database file. By default, pyrekordbox automatically finds the Rekordbox v6 master.db database file. This parameter is only required for opening other databases or if the configuration fails. db_dir: str, optional The path of the Rekordbox v6 database directory. By default, pyrekordbox automatically finds the Rekordbox v6 database directory. Usually this is also the root directory of the analysis files. This parameter is only required for finding the analysis root directory if you are opening a database, that is stored somewhere else. key : str, optional The database key. By default, pyrekordbox automatically reads the database key from the Rekordbox v6 configuration file. This parameter is only required if the key extraction fails. unlock: bool, optional Flag if the database needs to be decrypted. Set to False if you are opening an unencrypted test database. Attributes ---------- engine : sqlalchemy.engine.Engine The SQLAlchemy engine instance for the Rekordbox v6 database. session : sqlalchemy.orm.Session The SQLAlchemy session instance bound to the engine. See Also -------- pyrekordbox.db6.tables: Rekordbox v6 database table definitions create_rekordbox_engine: Creates the SQLAlchemy engine for the Rekordbox v6 database Examples -------- Pyrekordbox automatically finds the Rekordbox v6 master.db database file and opens it when initializing the object: >>> db = Rekordbox6Database() Use the included getters for querying the database: >>> db.get_content()[0] <DjmdContent(40110712 Title=NOISE)> """ def __init__( self, path: PathLike = None, db_dir: PathLike = "", key: str = "", unlock: bool = True ): # get config of latest supported version rb_config = get_config("rekordbox7") if not rb_config: rb_config = get_config("rekordbox6") pid = get_rekordbox_pid() if pid: logger.warning("Rekordbox is running!") if not path: # Get path from the RB config path = rb_config.get("db_path", "") if not path: pdir = get_config("pioneer", "install_dir") raise FileNotFoundError(f"No Rekordbox v6/v7 directory found in '{pdir}'") db_path: Path = Path(path) # make sure file exists if not db_path.exists(): raise FileNotFoundError(f"File '{db_path}' does not exist!") # Open database if unlock: if not _sqlcipher_available: # pragma: no cover raise ImportError("Could not unlock database: 'sqlcipher3' package not found") if not key: # pragma: no cover key = deobfuscate(BLOB) elif not key.startswith("402fd"): # Check if key looks like a valid key raise ValueError("The provided database key doesn't look valid!") # Unlock database and create engine logger.debug("Key: %s", key) url = f"sqlite+pysqlcipher://:{key}@/{db_path}?" engine = create_engine(url, module=sqlite3) else: engine = create_engine(f"sqlite:///{db_path}") if not db_dir: db_dir = db_path.parent db_directory: Path = Path(db_dir) if not db_directory.exists(): raise FileNotFoundError(f"Database directory '{db_directory}' does not exist!") self.engine = engine self.session: Optional[Session] = None self.registry = RekordboxAgentRegistry(self) self._events: Dict[str, Callable[[Any], None]] = dict() self.playlist_xml: Optional[MasterPlaylistXml] try: self.playlist_xml = MasterPlaylistXml(db_dir=db_directory) except FileNotFoundError: logger.warning(f"No masterPlaylists6.xml found in {db_directory}") self.playlist_xml = None self._db_dir = db_directory self._share_dir: Path = db_directory / "share" self.open() @property def no_autoflush(self) -> Any: """Creates a no-autoflush context.""" if self.session is None: raise SessionNotInitializedError() return self.session.no_autoflush @property def db_directory(self) -> Path: return self._db_dir @property def share_directory(self) -> Path: return self._share_dir
[docs] def open(self) -> None: """Open the database by instantiating a new session using the SQLAchemy engine. A new session instance is only created if the session was closed previously. Examples -------- >>> db = Rekordbox6Database() >>> db.close() >>> db.open() """ if self.session is None: self.session = Session(bind=self.engine) self.registry.clear_buffer()
[docs] def close(self) -> None: """Close the currently active session.""" if self.session is None: raise SessionNotInitializedError() for key in self._events: self.unregister_event(key) self.registry.clear_buffer() self.session.close() self.session = None
def __enter__(self) -> "Rekordbox6Database": return self def __exit__( self, type_: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: self.close()
[docs] def register_event(self, identifier: str, fn: Callable[[Any], None]) -> None: """Registers a session event callback. Parameters ---------- identifier : str The identifier of the event, for example 'before_flush', 'after_commit', ... See the SQLAlchemy documentation for a list of valid event identifiers. fn : callable The event callback method. """ if self.session is None: raise SessionNotInitializedError() event.listen(self.session, identifier, fn) self._events[identifier] = fn
[docs] def unregister_event(self, identifier: str) -> None: """Removes an existing session event callback. Parameters ---------- identifier : str The identifier of the event """ if self.session is None: raise SessionNotInitializedError() fn = self._events[identifier] event.remove(self.session, identifier, fn)
[docs] def query(self, *entities: Any, **kwargs: Any) -> Any: """Creates a new SQL query for the given entities. Parameters ---------- *entities : Base The table objects for which the query is created. **kwargs Arbitrary keyword arguments used for creating the query. Returns ------- query : sqlalchemy.orm.query.Query The SQLAlchemy ``Query`` object. Examples -------- Query the ``DjmdContent`` table >>> db = Rekordbox6Database() >>> query = db.query(DjmdContent) Query the `Title` attribute of the ``DjmdContent`` table >>> db = Rekordbox6Database() >>> query = db.query(DjmdContent.Title) """ if self.session is None: raise SessionNotInitializedError() return self.session.query(*entities, **kwargs)
[docs] def add(self, instance: tables.Base) -> None: """Add an element to the Rekordbox database. Parameters ---------- instance : tables.Base The table entry to add. """ if self.session is None: raise SessionNotInitializedError() self.session.add(instance) self.registry.on_create(instance)
[docs] def delete(self, instance: tables.Base) -> None: """Delete an element from the Rekordbox database. Parameters ---------- instance : tables.Base The table entry to delte. """ if self.session is None: raise SessionNotInitializedError() self.session.delete(instance) self.registry.on_delete(instance)
[docs] def get_local_usn(self) -> int: """Returns the local sequence number (update count) of Rekordbox. Any changes made to the `Djmd...` tables increments the local update count of Rekordbox. The ``usn`` entry of the changed row is set to the corresponding update count. Returns ------- usn : int The value of the local update count. """ return self.registry.get_local_update_count()
[docs] def set_local_usn(self, usn: int) -> None: """Sets the local sequence number (update count) of Rekordbox. Parameters ---------- usn : int or str The new update sequence number. """ self.registry.set_local_update_count(usn)
[docs] def increment_local_usn(self, num: int = 1) -> int: """Increments the local update sequence number (update count) of Rekordbox. Parameters ---------- num : int, optional The number of times to increment the update counter. By default, the counter is incremented by 1. Returns ------- usn : int The value of the incremented local update count. Examples -------- >>> db = Rekordbox6Database() >>> db.get_local_usn() 70500 >>> db.increment_local_usn() 70501 >>> db.get_local_usn() 70501 """ return self.registry.increment_local_update_count(num)
[docs] def autoincrement_usn(self, set_row_usn: bool = True) -> int: """Auto-increments the local USN for all uncommited changes. Parameters ---------- set_row_usn : bool, optional If True, set the ``rb_local_usn`` value of updated or added rows according to the uncommited update sequence. Returns ------- new_usn : int The new local update sequence number after applying all updates. Examples -------- >>> db = Rekordbox6Database() >>> db.get_local_usn() 70500 >>> content = db.get_content().first() >>> playlist = db.get_playlist().first() >>> content.Title = "New Title" >>> playlist.Name = "New Name" >>> db.autoincrement_usn(set_row_usn=True) >>> db.get_local_usn() 70502 """ return self.registry.autoincrement_local_update_count(set_row_usn)
[docs] def flush(self) -> None: """Flushes the buffer of the SQLAlchemy session instance.""" if self.session is None: raise SessionNotInitializedError() self.session.flush()
[docs] def commit(self, autoinc: bool = True) -> None: """Commit the changes made to the database. Parameters ---------- autoinc : bool, optional If True, auto-increment the local and row USN's before commiting the changes made to the database. See Also -------- autoincrement_usn : Auto-increments the local Rekordbox USN's. """ if self.session is None: raise SessionNotInitializedError() pid = get_rekordbox_pid() if pid: raise RuntimeError( "Rekordbox is running. Please close Rekordbox before commiting changes." ) if autoinc: self.registry.autoincrement_local_update_count(set_row_usn=True) self.session.commit() self.registry.clear_buffer() # Update the masterPlaylists6.xml file if self.playlist_xml is not None: # Sync the updated_at values of the playlists in the DB and the XML file for pl in self.get_playlist(): plxml = self.playlist_xml.get(pl.ID) if plxml is not None: ts = plxml["Timestamp"] diff = pl.updated_at - ts if abs(diff.total_seconds()) > 1: logger.debug("Updating updated_at of playlist %s in XML", pl.ID) self.playlist_xml.update(pl.ID, updated_at=pl.updated_at) else: # Dont warn for special playlists if pl.ID not in SPECIAL_PLAYLIST_IDS: logger.warning( f"Playlist {pl.ID} not found in masterPlaylists6.xml! " "Did you add it manually? " "Use the create_playlist method instead." ) # Save the XML file if it was modified if self.playlist_xml.modified: self.playlist_xml.save()
[docs] def rollback(self) -> None: """Rolls back the uncommited changes to the database.""" if self.session is None: raise SessionNotInitializedError() self.session.rollback() self.registry.clear_buffer()
# -- Table queries -----------------------------------------------------------------
[docs] def get_active_censor(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdActiveCensor`` table.""" query = self.query(tables.DjmdActiveCensor).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_album(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdAlbum`` table.""" query = self.query(tables.DjmdAlbum).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_artist(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdArtist`` table.""" query = self.query(tables.DjmdArtist).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_category(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdCategory`` table.""" query = self.query(tables.DjmdCategory).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_color(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdColor`` table.""" query = self.query(tables.DjmdColor).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_content(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdContent`` table.""" query = self.query(tables.DjmdContent).filter_by(**kwargs) return _parse_query_result(query, kwargs)
# noinspection PyUnresolvedReferences
[docs] def search_content(self, text: str) -> List[DjmdContent]: """Searches the contents of the ``DjmdContent`` table. The search is case-insensitive and includes the following collumns of the ``DjmdContent`` table: - `Album` - `Artist` - `Commnt` - `Composer` - `Genre` - `Key` - `OrgArtist` - `Remixer` Parameters ---------- text : str The search text. Returns ------- results : list[DjmdContent] The resulting content elements. """ # Search standard columns query = self.query(tables.DjmdContent).filter( or_( DjmdContent.Title.contains(text), DjmdContent.Commnt.contains(text), DjmdContent.SearchStr.contains(text), ) ) results = set(query.all()) # Search artist (Artist, OrgArtist, Composer and Remixer) artist_attrs = ["Artist", "OrgArtist", "Composer", "Remixer"] for attr in artist_attrs: query = self.query(DjmdContent).join(getattr(DjmdContent, attr)) results.update(query.filter(tables.DjmdArtist.Name.contains(text)).all()) # Search album query = self.query(DjmdContent).join(DjmdContent.Album) results.update(query.filter(tables.DjmdAlbum.Name.contains(text)).all()) # Search Genre query = self.query(DjmdContent).join(DjmdContent.Genre) results.update(query.filter(tables.DjmdGenre.Name.contains(text)).all()) # Search Key query = self.query(DjmdContent).join(DjmdContent.Key) results.update(query.filter(tables.DjmdKey.ScaleName.contains(text)).all()) result_list: List[DjmdContent] = list(results) result_list.sort(key=lambda x: x.ID) return result_list
[docs] def get_cue(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdCue`` table.""" query = self.query(tables.DjmdCue).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_device(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdDevice`` table.""" query = self.query(tables.DjmdDevice).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_genre(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdGenre`` table.""" query = self.query(tables.DjmdGenre).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_history(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdHistory`` table.""" query = self.query(tables.DjmdHistory).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_history_songs(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSongHistory`` table.""" query = self.query(tables.DjmdSongHistory).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_hot_cue_banklist(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdHotCueBanklist`` table.""" query = self.query(tables.DjmdHotCueBanklist).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_hot_cue_banklist_songs(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSongHotCueBanklist`` table.""" query = self.query(tables.DjmdSongHotCueBanklist).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_key(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdKey`` table.""" query = self.query(tables.DjmdKey).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_label(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdLabel`` table.""" query = self.query(tables.DjmdLabel).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_menu_items(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdMenuItems`` table.""" query = self.query(tables.DjmdMenuItems).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_mixer_param(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdMixerParam`` table.""" query = self.query(tables.DjmdMixerParam).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_my_tag(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdMyTag`` table.""" query = self.query(tables.DjmdMyTag).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_my_tag_songs(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSongMyTag`` table.""" query = self.query(tables.DjmdSongMyTag).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_playlist(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdPlaylist`` table.""" query = self.query(tables.DjmdPlaylist).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_playlist_songs(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSongPlaylist`` table.""" query = self.query(tables.DjmdSongPlaylist).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_playlist_contents(self, playlist: PlaylistLike, *entities: tables.Base) -> Any: """Return the contents of a regular or smart playlist. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist instance. Can either be a :class:`DjmdPlaylist` object or a playlist ID. *entities : Base The table objects for which the query is created. If no entities are given, the query will return the :class:`DjmdContent` objects. Returns ------- query : sqlalchemy.orm.query.Query The SQLAlchemy ``Query`` object. The query contains the content instances or the selected columns if ``entities`` are given. Examples -------- Return the content instances in the playlist >>> db = Rekordbox6Database() >>> pl = db.get_playlist(Name="My Playlist").one() >>> db.get_playlist_contents(pl).all() [<DjmdContent(12345678 Title=Title1)>, <DjmdContent(23456789 Title=Title2)>] Return only the content IDs >>> db.get_playlist_contents(pl, DjmdContent.ID).all() [('12345678',), ('23456789',)] """ plist: DjmdPlaylist if isinstance(playlist, (int, str)): plist = self.get_playlist(ID=playlist) else: plist = playlist if plist.is_folder: raise ValueError(f"Playlist {plist} is a playlist folder.") if not entities: entities = [ DjmdContent, ] # type: ignore[assignment] if plist.is_smart_playlist: smartlist = SmartList() smartlist.parse(plist.SmartList) filter_clause = smartlist.filter_clause() else: sub_query = self.query(tables.DjmdSongPlaylist.ContentID).filter( tables.DjmdSongPlaylist.PlaylistID == plist.ID ) filter_clause = DjmdContent.ID.in_(select(sub_query.subquery())) return self.query(*entities).filter(filter_clause)
[docs] def get_property(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdProperty`` table.""" query = self.query(tables.DjmdProperty).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_sampler(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSampler`` table.""" query = self.query(tables.DjmdSampler).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_sampler_songs(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSongSampler`` table.""" query = self.query(tables.DjmdSongSampler).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_tag_list_songs(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSongTagList`` table.""" query = self.query(tables.DjmdSongTagList).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_sort(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``DjmdSort`` table.""" query = self.query(tables.DjmdSort).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_agent_registry(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``AgentRegistry`` table.""" query = self.query(tables.AgentRegistry).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_cloud_agent_registry(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``CloudAgentRegistry`` table.""" query = self.query(tables.CloudAgentRegistry).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_content_active_censor(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``ContentActiveCensor`` table.""" query = self.query(tables.ContentActiveCensor).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_content_cue(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``ContentCue`` table.""" query = self.query(tables.ContentCue).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_content_file(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``ContentFile`` table.""" query = self.query(tables.ContentFile).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_hot_cue_banklist_cue(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``HotCueBanklistCue`` table.""" query = self.query(tables.HotCueBanklistCue).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_image_file(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``ImageFile`` table.""" query = self.query(tables.ImageFile).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_setting_file(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``SettingFile`` table.""" query = self.query(tables.SettingFile).filter_by(**kwargs) return _parse_query_result(query, kwargs)
[docs] def get_uuid_map(self, **kwargs: Any) -> Any: """Creates a filtered query for the ``UuidIDMap`` table.""" query = self.query(tables.UuidIDMap).filter_by(**kwargs) return _parse_query_result(query, kwargs)
# -- Database updates --------------------------------------------------------------
[docs] def generate_unused_id( self, table: Type[tables.Base], is_28_bit: bool = True, id_field_name: str = "ID" ) -> int: """Generates an unused ID for the given table.""" max_tries = 1000000 for _ in range(max_tries): # Generate random ID buf = secrets.token_bytes(4) id_ = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3] >> 0 if is_28_bit: id_ = id_ >> 4 if id_ < 100: continue # Check if ID is already used id_field = getattr(table, id_field_name) query = self.query(id_field).filter(id_field == id_) used = self.query(query.exists()).scalar() if not used: return id_ raise ValueError("Could not generate unused ID")
[docs] def add_to_playlist( self, playlist: PlaylistLike, content: ContentLike, track_no: int = None ) -> tables.DjmdSongPlaylist: """Adds a track to a playlist. Creates a new :class:`DjmdSongPlaylist` object corresponding to the given content and adds it to the playlist. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist to add the track to. Can either be a :class:`DjmdPlaylist` object or a playlist ID. content : DjmdContent or int or str The content to add to the playlist. Can either be a :class:`DjmdContent` object or a content ID. track_no : int, optional The track number to add the content to. If not specified, the track will be added to the end of the playlist. Returns ------- song: DjmdSongPlaylist The song playlist object that was created from the content. Raises ------ ValueError : If the playlist is a folder or smart playlist. ValueError : If the track number is less than 1 or to large. Examples -------- Add a track to the end of a playlist: >>> db = Rekordbox6Database() >>> cid = 12345 # Content ID >>> pid = 56789 # Playlist ID >>> db.add_to_playlist(pid, cid) <DjmdSongPlaylist(c803dfde-2236-4659-b3d7-e57221663375)> Add a track to the beginning of a playlist: >>> new_song = db.add_to_playlist(pid, cid, track_no=1) >>> new_song.TrackNo 1 """ plist: DjmdPlaylist cont: DjmdContent if isinstance(playlist, (int, str)): plist = self.get_playlist(ID=playlist) else: plist = playlist if isinstance(content, (int, str)): cont = self.get_content(ID=content) else: cont = content # Check playlist attribute (can't be folder or smart playlist) if plist.Attribute != 0: raise ValueError("Playlist must be a normal playlist") uuid = str(uuid4()) id_ = str(uuid4()) now = datetime.datetime.now() nsongs = self.query(tables.DjmdSongPlaylist).filter_by(PlaylistID=plist.ID).count() if track_no is not None: insert_at_end = False track_no = int(track_no) if track_no < 1: raise ValueError("Track number must be greater than 0") if track_no > nsongs + 1: raise ValueError(f"Track number too high, parent contains {nsongs} items") else: insert_at_end = True track_no = nsongs + 1 cid = cont.ID pid = plist.ID logger.info("Adding content with ID=%s to playlist with ID=%s:", cid, pid) logger.debug("Content ID: %s", cid) logger.debug("Playlist ID: %s", pid) logger.debug("ID: %s", id_) logger.debug("UUID: %s", uuid) logger.debug("TrackNo: %s", track_no) moved = list() if not insert_at_end: self.registry.disable_tracking() # Update track numbers higher than the removed track query = ( self.query(tables.DjmdSongPlaylist) .filter( tables.DjmdSongPlaylist.PlaylistID == plist.ID, tables.DjmdSongPlaylist.TrackNo >= track_no, ) .order_by(tables.DjmdSongPlaylist.TrackNo) ) for other_song in query: other_song.TrackNo += 1 other_song.updated_at = now moved.append(other_song) self.registry.enable_tracking() # Add song to playlist song: tables.DjmdSongPlaylist = tables.DjmdSongPlaylist.create( ID=id_, PlaylistID=str(pid), ContentID=str(cid), TrackNo=track_no, UUID=uuid, created_at=now, updated_at=now, ) self.add(song) if not insert_at_end: moved.append(song) self.registry.on_move(moved) return song
[docs] def remove_from_playlist( self, playlist: PlaylistLike, song: Union[tables.DjmdSongPlaylist, int, str], ) -> None: """Removes a track from a playlist. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist to remove the track from. Can either be a :class:`DjmdPlaylist` object or a playlist ID. song : DjmdSongPlaylist or int or str The song to remove from the playlist. Can either be a :class:`DjmdSongPlaylist` object or a song ID. Examples -------- Remove a track from a playlist: >>> db = Rekordbox6Database() >>> pid = 56789 >>> pl = db.get_playlist(ID=pid) >>> song = pl.Songs[0] >>> db.remove_from_playlist(pl, song) """ plist: DjmdPlaylist plist_song: DjmdSongPlaylist if isinstance(playlist, (int, str)): plist = self.get_playlist(ID=playlist) else: plist = playlist if isinstance(song, (int, str)): plist_song = self.query(tables.DjmdSongPlaylist).filter_by(ID=song).one() else: plist_song = song if not isinstance(plist_song, tables.DjmdSongPlaylist): raise ValueError( "Playlist must be a DjmdSongPlaylist or corresponding playlist song ID!" ) logger.info("Removing song with ID=%s from playlist with ID=%s", plist_song.ID, plist.ID) now = datetime.datetime.now() # Remove track from playlist track_no = plist_song.TrackNo self.delete(plist_song) self.commit() # Update track numbers higher than the removed track query = ( self.query(tables.DjmdSongPlaylist) .filter( tables.DjmdSongPlaylist.PlaylistID == plist.ID, tables.DjmdSongPlaylist.TrackNo > track_no, ) .order_by(tables.DjmdSongPlaylist.TrackNo) ) moved = list() with self.registry.disabled(): for other_song in query: other_song.TrackNo -= 1 other_song.updated_at = now moved.append(other_song) if moved: self.registry.on_move(moved)
[docs] def move_song_in_playlist( self, playlist: PlaylistLike, song: Union[tables.DjmdSongPlaylist, int, str], new_track_no: int, ) -> None: """Sets a new track number of a song. Also updates the track numbers of the other songs in the playlist. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist the track is in. Can either be a :class:`DjmdPlaylist` object or a playlist ID. song : DjmdSongPlaylist or int or str The song to move inside the playlist. Can either be a :class:`DjmdSongPlaylist` object or a song ID. new_track_no : int The new track number of the song. Must be greater than 0 and less than the number of songs in the playlist. Examples -------- Take a playlist containing a few tracks: >>> db = Rekordbox6Database() >>> pid = 56789 >>> pl = db.get_playlist(ID=pid) >>> songs = sorted(pl.Songs, key=lambda x: x.TrackNo) >>> [s.Content.Title for s in songs] # noqa ['Demo Track 1', 'Demo Track 2', 'HORN', 'NOISE'] Move a track forward in a playlist: >>> song = songs[2] >>> db.move_song_in_playlist(pl, song, new_track_no=1) >>> [s.Content.Title for s in sorted(pl.Songs, key=lambda x: x.TrackNo)] # noqa ['HORN', 'Demo Track 1', 'Demo Track 2', 'NOISE'] Move a track backward in a playlist: >>> song = songs[1] >>> db.move_song_in_playlist(pl, song, new_track_no=4) >>> [s.Content.Title for s in sorted(pl.Songs, key=lambda x: x.TrackNo)] # noqa ['Demo Track 1', 'HORN', 'NOISE', 'Demo Track 2'] """ plist: DjmdPlaylist plist_song: DjmdSongPlaylist if isinstance(playlist, (int, str)): plist = self.get_playlist(ID=playlist) else: plist = playlist if isinstance(song, (int, str)): plist_song = self.query(tables.DjmdSongPlaylist).filter_by(ID=song).one() else: plist_song = song nsongs = self.query(tables.DjmdSongPlaylist).filter_by(PlaylistID=plist.ID).count() if new_track_no < 1: raise ValueError("Track number must be greater than 0") if new_track_no > nsongs + 1: raise ValueError(f"Track number too high, parent contains {nsongs} items") logger.info( "Moving song with ID=%s in playlist with ID=%s to %s", plist_song.ID, plist.ID, new_track_no, ) now = datetime.datetime.now() old_track_no = plist_song.TrackNo self.registry.disable_tracking() moved = list() if new_track_no > old_track_no: query = ( self.query(tables.DjmdSongPlaylist) .filter( tables.DjmdSongPlaylist.PlaylistID == plist.ID, old_track_no < tables.DjmdSongPlaylist.TrackNo, tables.DjmdSongPlaylist.TrackNo <= new_track_no, ) .order_by(tables.DjmdSongPlaylist.TrackNo) ) for other_song in query: other_song.TrackNo -= 1 other_song.updated_at = now moved.append(other_song) elif new_track_no < old_track_no: query = self.query(tables.DjmdSongPlaylist).filter( tables.DjmdSongPlaylist.PlaylistID == plist.ID, new_track_no <= tables.DjmdSongPlaylist.TrackNo, tables.DjmdSongPlaylist.TrackNo < old_track_no, ) for other_song in query: other_song.TrackNo += 1 other_song.updated_at = now moved.append(other_song) else: return plist_song.TrackNo = new_track_no plist_song.updated_at = now moved.append(song) self.registry.enable_tracking() self.registry.on_move(moved)
def _create_playlist( self, name: str, seq: Optional[int], image_path: Optional[str], parent: Optional[PlaylistLike], smart_list: Optional[SmartList] = None, attribute: int = None, ) -> DjmdPlaylist: """Creates a new playlist object.""" table = tables.DjmdPlaylist id_ = str(self.generate_unused_id(table, is_28_bit=True)) uuid = str(uuid4()) attrib = int(attribute) if attribute is not None else 0 now = datetime.datetime.now() if smart_list is not None: # Set the playlist ID in the smart list and generate XML smart_list.playlist_id = id_ smart_list_xml = smart_list.to_xml() else: smart_list_xml = None if parent is None: # If no parent is given, use root playlist parent_id = "root" elif isinstance(parent, tables.DjmdPlaylist): # Check if parent is a folder parent_id = parent.ID if parent.Attribute != 1: raise ValueError("Parent is not a folder") else: # Check if parent exists and is a folder parent_id = str(parent) query = self.query(table.ID).filter(table.ID == parent_id, table.Attribute == 1) if not self.query(query.exists()).scalar(): raise ValueError("Parent does not exist or is not a folder") n = self.get_playlist(ParentID=parent_id).count() logger.debug("Parent playlist with ID=%s contains %s items", parent_id, n) if seq is None: # New playlist is last in parents seq = n + 1 insert_at_end = True else: # Check if sequence number is valid insert_at_end = False if seq < 1: raise ValueError("Sequence number must be greater than 0") elif seq > n + 1: raise ValueError(f"Sequence number too high, parent contains {n} items") logger.debug("ID: %s", id_) logger.debug("UUID: %s", uuid) logger.debug("Name: %s", name) logger.debug("Parent ID: %s", parent_id) logger.debug("Seq: %s", seq) logger.debug("Attribute: %s", attrib) logger.debug("Smart List: %s", smart_list_xml) logger.debug("Image Path: %s", image_path) # Update seq numbers higher than the new seq number if not insert_at_end: query = self.query(tables.DjmdPlaylist).filter( tables.DjmdPlaylist.ParentID == parent_id, tables.DjmdPlaylist.Seq >= seq, ) for pl in query: pl.Seq += 1 with self.registry.disabled(): pl.updated_at = now # Add new playlist to database # First create with name 'New playlist' playlist: DjmdPlaylist = table.create( ID=id_, Seq=seq, Name="New playlist", ImagePath=image_path, Attribute=attrib, ParentID=parent_id, SmartList=smart_list_xml, UUID=uuid, created_at=now, updated_at=now, ) self.add(playlist) # Then update with correct name for correct USN playlist.Name = name # Update masterPlaylists6.xml if self.playlist_xml is not None: self.playlist_xml.add(id_, parent_id, attrib, now, lib_type=0, check_type=0) return playlist
[docs] def create_playlist( self, name: str, parent: PlaylistLike = None, seq: int = None, image_path: str = None ) -> DjmdPlaylist: """Creates a new playlist in the database. Parameters ---------- name : str The name of the new playlist. parent : DjmdPlaylist or int or str, optional The parent playlist of the new playlist. If not given, the playlist will be added to the root playlist. Can either be a :class:`DjmdPlaylist` object or a playlist ID. seq : int, optional The sequence number of the new playlist. If not given, the playlist will be added at the end of the parent playlist. image_path : str, optional The path to the image file of the new playlist. Returns ------- playlist : DjmdPlaylist The newly created playlist. Raises ------ ValueError : If the parent playlist is not a folder. ValueError : If the sequence number is less than 1 or to large. Examples -------- Create a new playlist in the root playlist: >>> db = Rekordbox6Database() >>> pl = db.create_playlist("My Playlist") >>> pl.ParentID 'root' Create a new playlist in a folder: >>> folder = db.get_playlist(Name="My Folder").one() >>> pl = db.create_playlist("My Playlist", parent=folder) >>> pl.ParentID '123456' """ logger.info("Creating playlist %s", name) return self._create_playlist(name, seq, image_path, parent, attribute=PlaylistType.PLAYLIST)
[docs] def create_playlist_folder( self, name: str, parent: PlaylistLike = None, seq: int = None, image_path: str = None ) -> DjmdPlaylist: """Creates a new playlist folder in the database. Parameters ---------- name : str The name of the new playlist folder. parent : DjmdPlaylist or int or str, optional The parent playlist of the new folder. If not given, the playlist will be added to the root playlist. Can either be a :class:`DjmdPlaylist` object or a playlist ID. seq : int, optional The sequence number of the new folder. If not given, the playlist will be added at the end of the parent playlist. image_path : str, optional The path to the image file of the new playlist. Returns ------- playlist_folder : DjmdPlaylist The newly created playlist folder. Examples -------- Create a new playlist folder in the root playlist: >>> db = Rekordbox6Database() >>> folder1 = db.create_playlist_folder("My Playlist Folder") >>> folder1.ParentID 'root' Create a new playlist folder in the other folder: >>> folder2 = db.create_playlist("My Playlist Folder2", parent=folder1) >>> folder2.ParentID '123456' """ logger.info("Creating playlist folder %s", name) return self._create_playlist(name, seq, image_path, parent, attribute=PlaylistType.FOLDER)
[docs] def create_smart_playlist( self, name: str, smart_list: SmartList, parent: PlaylistLike = None, seq: int = None, image_path: str = None, ) -> DjmdPlaylist: """Creates a new smart playlist in the database. Parameters ---------- name : str The name of the new smart playlist. smart_list : SmartList The smart list conditions to use for the new playlist. parent : DjmdPlaylist or int or str, optional The parent playlist of the new playlist. If not given, the playlist will be added to the root playlist. Can either be a :class:`DjmdPlaylist` object or a playlist ID. seq : int, optional The sequence number of the new playlist. If not given, the playlist will be added at the end of the parent playlist. image_path : str, optional The path to the image file of the new playlist. Returns ------- playlist : DjmdPlaylist The newly created playlist. Examples -------- Create a new smart list which we will use for the new smart playlist: >>> smart = SmartList(logical_operator=1) # ALL conditions must be met >>> smart.add_condition("genre", operator=1, value_left="House") # is House Create a new smart playlist in the root playlist: >>> db = Rekordbox6Database() >>> pl = db.create_smart_playlist("My Smart Playlist", smart) >>> pl.ID '123456789' >>> pl.SmartList[:72] '<NODE Id="123456789" LogicalOperator="1" AutomaticUpdate="1"><CONDITION ' """ logger.info("Creating smart playlist %s", name) return self._create_playlist( name, seq, image_path, parent, smart_list, PlaylistType.SMART_PLAYLIST )
[docs] def delete_playlist(self, playlist: PlaylistLike) -> None: """Deletes a playlist or playlist folder from the database. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist or playlist folder to delete. Can either be a :class:`DjmdPlaylist` object or a playlist ID. Examples -------- Delete a playlist: >>> db = Rekordbox6Database() >>> pl = db.get_playlist(Name="My Playlist").one() >>> db.delete_playlist(pl) Delete a playlist folder: >>> folder = db.get_playlist(Name="My Folder").one() >>> db.delete_playlist(folder) """ plist: DjmdPlaylist if isinstance(playlist, (int, str)): plist = self.get_playlist(ID=playlist) else: plist = playlist if not isinstance(plist, DjmdPlaylist): raise ValueError("Playlist must be a DjmdPlaylist or corresponding playlist ID!") if plist.Attribute == 1: logger.info("Deleting playlist folder '%s' with ID=%s", plist.Name, plist.ID) else: logger.info("Deleting playlist '%s' with ID=%s", plist.Name, plist.ID) now = datetime.datetime.now() seq = plist.Seq parent_id = plist.ParentID self.registry.disable_tracking() # Update seq numbers higher than the deleted seq number query = ( self.query(tables.DjmdPlaylist) .filter( tables.DjmdPlaylist.ParentID == parent_id, tables.DjmdPlaylist.Seq > seq, ) .order_by(tables.DjmdPlaylist.Seq) ) moved = list() for pl in query: pl.Seq -= 1 pl.updated_at = now moved.append(pl) moved.append(plist) children = [plist] # Get all child playlist IDs child_ids = list() while len(children): new_children = list() for child in children: child_ids.append(child.ID) new_children.extend(list(child.Children)) children = new_children # First ID in 'child_ids' is always the deleted playlist, others are children # Remove playlist from masterPlaylists6.xml if self.playlist_xml is not None: for pid in child_ids: self.playlist_xml.remove(pid) # Remove playlist from database self.delete(plist) self.registry.enable_tracking() if len(child_ids) > 1: # The playlist folder had children: one extra USN increment self.registry.on_delete(child_ids[1:]) self.registry.on_delete(moved)
[docs] def move_playlist( self, playlist: PlaylistLike, parent: PlaylistLike = None, seq: int = None ) -> None: """Moves a playlist (folder) in the current parent folder or to a new one. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist or playlist folder to move. Can either be a :class:`DjmdPlaylist` object or a playlist ID. parent : DjmdPlaylist or int or str, optional The new parent playlist of the playlist. If not given, the playlist will be moved to `seq` in the current parent playlist. Can either be a :class:`DjmdPlaylist` object or a playlist ID. seq : int, optional The new sequence number of the playlist. If the `parent` argument is given, the playlist will be moved to `seq` in the new parent playlist or to the end of the new parent folder if `seq=None`. If the `parent` argument is not given, the playlist will be moved to `seq` in the current parent. Examples -------- Take the following playlist tree: >>> db = Rekordbox6Database() >>> playlists = db.get_playlist().order_by(tables.DjmdPlaylist.Seq) >>> [pl.Name for pl in playlists] # noqa ['Folder 1', 'Folder 2', 'Playlist 1', 'Playlist 2', 'Playlist 3'] The playlists and folders above are all in the `root` plalyist folder. Move a playlist in the current parent folder: >>> pl = db.get_playlist(Name="Playlist 2").one() # noqa >>> db.move_playlist(pl, seq=2) >>> playlists = db.get_playlist().order_by(tables.DjmdPlaylist.Seq) >>> [pl.Name for pl in playlists] # noqa ['Folder 1', 'Playlist 2', 'Folder 2', 'Playlist 1', 'Playlist 3'] Move a playlist to a new parent folder: >>> pl = db.get_playlist(Name="Playlist 1").one() # noqa >>> parent = db.get_playlist(Name="Folder 1").one() # noqa >>> db.move_playlist(pl, parent=parent) >>> db.get_playlist(ParentID=parent.ID).all() ['Playlist 1'] """ if parent is None and seq is None: raise ValueError("Either parent or seq must be given") plist: DjmdPlaylist seqence: int if isinstance(playlist, (int, str)): plist = self.get_playlist(ID=playlist) else: plist = playlist now = datetime.datetime.now() table = tables.DjmdPlaylist if parent is None: # If no parent is given, keep the current parent parent_id = plist.ParentID elif isinstance(parent, tables.DjmdPlaylist): # Check if parent is a folder parent_id = parent.ID if parent.Attribute != 1: raise ValueError("Parent is not a folder") else: # Check if parent exists and is a folder parent_id = str(parent) query = self.query(table.ID).filter(table.ID == parent_id, table.Attribute == 1) if not self.query(query.exists()).scalar(): raise ValueError("Parent does not exist or is not a folder") n = self.get_playlist(ParentID=parent_id).count() old_seq = plist.Seq if parent_id != plist.ParentID: # Move to new parent old_parent_id = plist.ParentID if seq is None: # New playlist is last in parents seqence = n + 1 insert_at_end = True else: seqence = seq # Check if sequence number is valid insert_at_end = False if seqence < 1: raise ValueError("Sequence number must be greater than 0") elif seqence > n + 1: raise ValueError(f"Sequence number too high, parent contains {n} items") if not insert_at_end: # Get all playlists with seq between old_seq and seq query = ( self.query(tables.DjmdPlaylist) .filter( tables.DjmdPlaylist.ParentID == parent_id, tables.DjmdPlaylist.Seq >= seq, ) .order_by(tables.DjmdPlaylist.Seq) ) other_playlists = query.all() # Set seq number and update time *before* other playlists to ensure # right USN increment order plist.ParentID = parent_id with self.registry.disabled(): plist.Seq = seqence plist.updated_at = now if not insert_at_end: # Update seq numbers higher than the new seq number in *new* parent # noinspection PyUnboundLocalVariable for pl in other_playlists: # Update time of other playlists are left unchanged pl.Seq += 1 # Each move counts as one USN increment, so disable for update time with self.registry.disabled(): pl.updated_at = now # Update seq numbers higher than the old seq number in *old* parent # USN is not updated here self.registry.disable_tracking() query = ( self.query(tables.DjmdPlaylist) .filter( tables.DjmdPlaylist.ParentID == old_parent_id, tables.DjmdPlaylist.Seq > old_seq, ) .order_by(tables.DjmdPlaylist.Seq) ) for pl in query: # Update time of other playlists are left unchanged pl.Seq -= 1 pl.updated_at = now self.registry.enable_tracking() else: # Keep parent, only change seq number if seq is None: raise ValueError("Sequence number must be given") seqence = seq if seqence < 1: raise ValueError("Sequence number must be greater than 0") elif seqence > n + 1: raise ValueError(f"Sequence number too high, parent contains {n} items") if seqence > old_seq: # Get all playlists with seq between old_seq and seq query = ( self.query(tables.DjmdPlaylist) .filter( tables.DjmdPlaylist.ParentID == plist.ParentID, old_seq < tables.DjmdPlaylist.Seq, tables.DjmdPlaylist.Seq <= seqence, ) .order_by(tables.DjmdPlaylist.Seq) ) other_playlists = query.all() delta_seq = -1 elif seqence < old_seq: query = ( self.query(tables.DjmdPlaylist) .filter( tables.DjmdPlaylist.ParentID == plist.ParentID, seqence <= tables.DjmdPlaylist.Seq, tables.DjmdPlaylist.Seq < old_seq, ) .order_by(tables.DjmdPlaylist.Seq) ) other_playlists = query.all() delta_seq = +1 else: return # Set seq number and update time *before* other playlists to ensure # right USN increment order plist.Seq = seqence # Each move counts as one USN increment, so disable for update time with self.registry.disabled(): plist.updated_at = now # Set seq number and update time for playlists between old_seq and seq for pl in other_playlists: pl.Seq += delta_seq # Each move counts as one USN increment, so disable for update time with self.registry.disabled(): pl.updated_at = now
[docs] def rename_playlist(self, playlist: PlaylistLike, name: str) -> None: """Renames a playlist or playlist folder. Parameters ---------- playlist : DjmdPlaylist or int or str The playlist or playlist folder to move. Can either be a :class:`DjmdPlaylist` object or a playlist ID. name : str The new name of the playlist or playlist folder. Examples -------- Take the following playlist tree: >>> db = Rekordbox6Database() >>> playlists = db.get_playlist().order_by(tables.DjmdPlaylist.Seq) >>> [pl.Name for pl in playlists] # noqa ['Playlist 1', 'Playlist 2'] Rename a playlist: >>> pl = db.get_playlist(Name="Playlist 1").one() # noqa >>> db.rename_playlist(pl, name="Playlist new") >>> playlists = db.get_playlist().order_by(tables.DjmdPlaylist.Seq) >>> [pl.Name for pl in playlists] # noqa ['Playlist new', 'Playlist 2'] """ pl: DjmdPlaylist if isinstance(playlist, (int, str)): pl = self.get_playlist(ID=playlist) else: pl = playlist now = datetime.datetime.now() # Update name of playlist pl.Name = name # Update update time: USN not incremented with self.registry.disabled(): pl.updated_at = now
[docs] def add_album( self, name: str, artist: Union[tables.DjmdArtist, int, str] = None, image_path: PathLike = None, compilation: bool = None, search_str: str = None, ) -> tables.DjmdAlbum: """Adds a new album to the database. Parameters ---------- name : str The name of the album. Must be a unique name (case-sensitive). If an album with the same name already exists in the database, use the `ID` of the existing album instead. artist : str or int or DjmdArtist, optional The artist of the album. Can either be a :class:`DjmdArtist` object or an artist ID. image_path : str, optional The path to the album cover image. compilation : bool, optional Whether the album is a compilation album. If not given, the default value of `False` is used. search_str : str, optional The search string of the album. Returns ------- album : DjmdAlbum The newly created album. Raises ------ ValueError : If an album with the same name already exists in the database. Examples -------- Add a new album to the database: >>> db = Rekordbox6Database() >>> db.add_album(name="Album 1") <DjmdAlbum(148754249 Name=Album 1)> Add a new album to the database with an album artist: >>> artist = db.get_artist(Name="Artist 1").one() # noqa >>> db.add_album(name="Album 2", artist=artist) <DjmdAlbum(148754249 Name=Album 2)> For setting the album of a track, the usual procedure is to first check if an entry with the same album name already exists in the database, and if not, add a new album: >>> name = "Album name" >>> content = db.get_content().one() >>> album = db.get_album(Name=name).one_or_none() >>> if album is None: ... album = db.add_album(name=name) >>> content.AlbumID = album.ID """ # Check if album already exists query = self.query(tables.DjmdAlbum).filter_by(Name=name) if query.count() > 0: raise ValueError(f"Album '{name}' already exists in database") # Get artist ID artist_id: Optional[str] = None if artist is not None: art: tables.DjmdArtist if isinstance(artist, (int, str)): art = self.get_artist(ID=artist) else: art = artist artist_id = art.ID id_ = self.generate_unused_id(tables.DjmdAlbum) uuid = str(uuid4()) album: tables.DjmdAlbum = tables.DjmdAlbum.create( ID=id_, Name=name, AlbumArtistID=artist_id, ImagePath=str(image_path) if image_path is not None else None, Compilation=compilation, SearchStr=search_str, UUID=str(uuid), ) self.add(album) self.flush() return album
[docs] def add_artist(self, name: str, search_str: str = None) -> tables.DjmdArtist: """Adds a new artist to the database. Parameters ---------- name : str The name of the artist. Must be a unique name (case-sensitive). If an artist with the same name already exists in the database, use the `ID` of the existing artist instead. search_str : str, optional The search string of the artist. Returns ------- artist : DjmdArtist The newly created artist. Raises ------ ValueError : If an artist with the same name already exists in the database. Examples -------- Add a new artist to the database: >>> db = Rekordbox6Database() >>> db.add_artist(name="Artist 1") <DjmdArtist(123456789, Name='Artist 1')> Add a new artist to the database with a custom search string: >>> db.add_artist(name="Artist 2", search_str="artist 2") <DjmdArtist(123456789, Name='Artist 2')> For setting the artist of a track, the usual procedure is to first check if an entry with the same artist name already exists in the database, and if not, add a new artist: >>> name = "Artist name" >>> content = db.get_content().one() >>> artist = db.get_artist(Name=name).one_or_none() >>> if artist is None: ... artist = db.add_artist(name=name) >>> content.ArtistID = artist.ID """ # Check if artist already exists query = self.query(tables.DjmdArtist).filter_by(Name=name) if query.count() > 0: raise ValueError(f"Artist '{name}' already exists in database") id_ = self.generate_unused_id(tables.DjmdArtist) uuid = str(uuid4()) artist: tables.DjmdArtist = tables.DjmdArtist.create( ID=id_, Name=name, SearchStr=search_str, UUID=uuid ) self.add(artist) self.flush() return artist
[docs] def add_genre(self, name: str) -> tables.DjmdGenre: """Adds a new genre to the database. Parameters ---------- name : str The name of the genre. Must be a unique name (case-sensitive). If a genre with the same name already exists in the database, use the `ID` of the existing genre instead. Returns ------- genre : DjmdGenre The newly created genre. Raises ------ ValueError : If a genre with the same name already exists in the database. Examples -------- Add a new genre to the database: >>> db = Rekordbox6Database() >>> db.add_genre(name="Genre 1") <DjmdGenre(123456789 Name=Genre 1)> For setting the genre of a track, the usual procedure is to first check if an entry with the same genre name already exists in the database, and if not, add a new genre: >>> name = "Genre name" >>> content = db.get_content().one() >>> genre = db.get_genre(Name=name).one_or_none() >>> if genre is None: ... genre = db.add_genre(name=name) >>> content.GenreID = genre.ID """ # Check if genre already exists query = self.query(tables.DjmdGenre).filter_by(Name=name) if query.count() > 0: raise ValueError(f"Genre '{name}' already exists in database") id_ = self.generate_unused_id(tables.DjmdGenre) uuid = str(uuid4()) genre: tables.DjmdGenre = tables.DjmdGenre.create(ID=id_, Name=name, UUID=uuid) self.add(genre) self.flush() return genre
[docs] def add_label(self, name: str) -> tables.DjmdLabel: """Adds a new label to the database. Parameters ---------- name : str The name of the label. Must be a unique name (case-sensitive). If a label with the same name already exists in the database, use the `ID` of the existing label instead. Returns ------- label : DjmdLabel The newly created label. Raises ------ ValueError : If a label with the same name already exists in the database. Examples -------- Add a new label to the database: >>> db = Rekordbox6Database() >>> db.add_label(name="Label 1") <DjmdLabel(123456789 Name=Label 1)> For setting the label of a track, the usual procedure is to first check if an entry with the same label name already exists in the database, and if not, add a new label: >>> name = "Label name" >>> content = db.get_content().one() >>> label = db.get_label(Name=name).one_or_none() >>> if label is None: ... label = db.add_label(name=name) >>> content.LabelID = label.ID """ # Check if label already exists query = self.query(tables.DjmdLabel).filter_by(Name=name) if query.count() > 0: raise ValueError(f"Label '{name}' already exists in database") id_ = self.generate_unused_id(tables.DjmdLabel) uuid = str(uuid4()) label: tables.DjmdLabel = tables.DjmdLabel.create(ID=id_, Name=name, UUID=uuid) self.add(label) self.flush() return label
[docs] def add_content(self, path: PathLike, **kwargs: Any) -> DjmdContent: """Adds a new track to the database. Parameters ---------- path : str Absolute path to the music file to be added. **kwargs: Keyword arguments passed to DjmdContent on creation. These arguments should be a valid DjmdContent field. Returns ------- content : DjmdContent The newly created track. Raises ------ ValueError : If a track with the same path already exists in the database. ValueError : If the file type is invalid. Examples -------- Add a new track to the database: >>> db = Rekordbox6Database() >>> db.add_content("/Users/foo/Downloads/banger.mp3", Title="Banger") <DjmdContent(123456789 Title=Banger)> """ path = Path(path) path_string = str(path) query = self.query(tables.DjmdContent).filter_by(FolderPath=path_string) if query.count() > 0: raise ValueError(f"Track with path '{path}' already exists in database") id_ = self.generate_unused_id(tables.DjmdContent) file_id = self.generate_unused_id(tables.DjmdContent, id_field_name="rb_file_id") uuid = str(uuid4()) content_link = self.get_menu_items(Name="TRACK").one() date_created = datetime.date.today() device = self.get_device().first() file_name_l = path.name file_size = path.stat().st_size file_type_string = path.suffix.lstrip(".").upper() try: file_type = getattr(FileType, file_type_string) except ValueError: raise ValueError(f"Invalid file type: {path.suffix}") content: DjmdContent = tables.DjmdContent.create( ID=id_, UUID=uuid, ContentLink=content_link.rb_local_usn, DateCreated=date_created, DeviceID=device.ID, FileNameL=file_name_l, FileSize=file_size, FileType=file_type.value, FolderPath=path_string, HotCueAutoLoad="on", MasterDBID=device.MasterDBID, MasterSongID=id_, StockDate=date_created, rb_file_id=file_id, **kwargs, ) self.add(content) self.flush() return content
# ----------------------------------------------------------------------------------
[docs] def get_mysetting_paths(self) -> List[Path]: """Returns the file paths of the local Rekordbox MySetting files. Returns ------- paths : list[str] the file paths of the local MySetting files. """ paths: List[Path] = list() for item in self.get_setting_file(): paths.append(self._db_dir / item.Path.lstrip("/\\")) return paths
[docs] def get_anlz_dir(self, content: ContentLike) -> Path: """Returns the directory path containing the ANLZ analysis files of a track. Parameters ---------- content : DjmdContent or int or str The content corresponding to a track in the Rekordbox v6 database. If an integer is passed the database is queried for the ``DjmdContent`` entry. Returns ------- anlz_dir : Path The path of the directory containing the analysis files for the content. """ cont: DjmdContent if isinstance(content, (int, str)): cont = self.get_content(ID=content) else: cont = content dat_path = Path(cont.AnalysisDataPath.strip("\\/")) path: Path = self._share_dir / dat_path.parent return path
[docs] def get_anlz_paths(self, content: ContentLike) -> Dict[str, Optional[Path]]: """Returns all existing ANLZ analysis file paths of a track. Parameters ---------- content : DjmdContent or int or str The content corresponding to a track in the Rekordbox v6 database. If an integer is passed the database is queried for the ``DjmdContent`` entry. Returns ------- anlz_paths : dict[str, Path] The analysis file paths for the content as dictionary. The keys of the dictionary are the file types ("DAT", "EXT" or "EX2"). """ root = self.get_anlz_dir(content) return get_anlz_paths(root)
[docs] def read_anlz_files(self, content: ContentLike) -> Dict[Path, AnlzFile]: """Reads all existing ANLZ analysis files of a track. Parameters ---------- content : DjmdContent or int or str The content corresponding to a track in the Rekordbox v6 database. If an integer is passed the database is queried for the ``DjmdContent`` entry. Returns ------- anlz_files : dict[str, AnlzFile] The analysis files for the content as dictionary. The keys of the dictionary are the file paths. """ root = self.get_anlz_dir(content) return read_anlz_files(root)
[docs] def get_anlz_path(self, content: ContentLike, type_: str) -> Optional[PathLike]: """Returns the file path of an ANLZ analysis file of a track. Parameters ---------- content : DjmdContent or int or str The content corresponding to a track in the Rekordbox v6 database. If an integer is passed the database is queried for the ``DjmdContent`` entry. type_ : str, optional The type of the analysis file to return. Must be one of "DAT", "EXT" or "EX2". "DAT" by default. Returns ------- anlz_path : Path or None The file path of the analysis file for the content. If the file does not exist, None is returned. """ root = self.get_anlz_dir(content) paths = get_anlz_paths(root) return paths.get(type_.upper(), "")
[docs] def read_anlz_file(self, content: ContentLike, type_: str) -> Optional[AnlzFile]: """Reads an ANLZ analysis file of a track. Parameters ---------- content : DjmdContent or int or str The content corresponding to a track in the Rekordbox v6 database. If an integer is passed the database is queried for the ``DjmdContent`` entry. type_ : str, optional The type of the analysis file to return. Must be one of "DAT", "EXT" or "EX2". "DAT" by default. Returns ------- anlz_file : AnlzFile or None The analysis file for the content. If the file does not exist, None is returned. """ path = self.get_anlz_path(content, type_) if path: return AnlzFile.parse_file(path) return None
[docs] def update_content_path( self, content: ContentLike, path: PathLike, save: bool = True, check_path: bool = True, commit: bool = True, ) -> None: """Update the file path of a track in the Rekordbox v6 database. This changes the `FolderPath` entry in the ``DjmdContent`` table and the path tag (PPTH) of the corresponding ANLZ analysis files. Parameters ---------- content : DjmdContent or int or str The ``DjmdContent`` element to change. If an integer is passed the database is queried for the content. path : str or Path The new file path of the database entry. save : bool, optional If True, the changes made are written to disc. check_path : bool, optional If True, raise an assertion error if the given file path does not exist. commit : bool, optional If True, the changes are committed to the database. True by default. Examples -------- If, for example, the file `NOISE.wav` was moved up a few directories (from `.../Sampler/OSC_SAMPLER/PRESET ONESHOT/` to `.../Sampler/`) the file could no longer be opened in Rekordbox, since the database still contains the old file path: >>> db = Rekordbox6Database() >>> cont = db.get_content()[0] >>> cont.FolderPath C:/Music/PioneerDJ/Sampler/OSC_SAMPLER/PRESET ONESHOT/NOISE.wav Updating the path changes the database entry >>> new_path = "C:/Music/PioneerDJ/Sampler/PRESET ONESHOT/NOISE.wav" >>> db.update_content_path(cont, path) >>> cont.FolderPath C:/Music/PioneerDJ/Sampler/PRESET ONESHOT/NOISE.wav and updates the file path in the corresponding ANLZ analysis files: >>> files = self.read_anlz_files(cont.ID) >>> file = list(files.values())[0] >>> file.get("path") C:/Music/PioneerDJ/Sampler/PRESET ONESHOT/NOISE.wav """ cont: DjmdContent if isinstance(content, (int, str)): cont = self.get_content(ID=content) else: cont = content cid = cont.ID path = Path(path) # Check and format path (the database and ANLZ files use "/" as path delimiter) if check_path: assert path.exists() path = str(path).replace("\\", "/") old_path = cont.FolderPath logger.info("Replacing '%s' with '%s' of content [%s]", old_path, path, cid) # Update path in ANLZ files anlz_files = self.read_anlz_files(cid) for anlz_path, anlz in anlz_files.items(): logger.debug("Updating path of %s: %s", anlz_path, path) anlz.set_path(path) # Update path in database (DjmdContent) logger.debug("Updating database file path: %s", path) cont.FolderPath = path # Update the OrgFolderPath column with the new path # if the column matches the old_path variable org_folder_path = cont.OrgFolderPath if org_folder_path == old_path: cont.OrgFolderPath = path # Update the FileNameL column with the new filename if it changed new_name = path.split("/")[-1] if cont.FileNameL != new_name: cont.FileNameL = new_name if save: logger.debug("Saving ANLZ files") # Save ANLZ files for anlz_path, anlz in anlz_files.items(): anlz.save(anlz_path) if commit: # Commit database changes logger.debug("Committing changes to the database") self.commit()
[docs] def update_content_filename( self, content: ContentLike, name: str, save: bool = True, check_path: bool = True, commit: bool = True, ) -> None: """Update the file name of a track in the Rekordbox v6 database. This changes the `FolderPath` entry in the ``DjmdContent`` table and the path tag (PPTH) of the corresponding ANLZ analysis files. Parameters ---------- content : DjmdContent or int or str The ``DjmdContent`` element to change. If an integer is passed the database is queried for the content. name : str The new file name of the database entry. save : bool, optional If True, the changes made are written to disc. check_path : bool, optional If True, raise an assertion error if the new file path does not exist. commit : bool, optional If True, the changes are committed to the database. True by default. See Also -------- update_content_path: Update the file path of a track in the Rekordbox database. Examples -------- Updating the file name changes the database entry >>> db = Rekordbox6Database() >>> cont = db.get_content()[0] >>> cont.FolderPath C:/Music/PioneerDJ/Sampler/OSC_SAMPLER/PRESET ONESHOT/NOISE.wav >>> new_name = "noise" >>> db.update_content_filename(cont, new_name) >>> cont.FolderPath C:/Music/PioneerDJ/Sampler/OSC_SAMPLER/PRESET ONESHOT/noise.wav and updates the file path in the corresponding ANLZ analysis files: >>> files = self.read_anlz_files(cont.ID) >>> file = list(files.values())[0] >>> cont.FolderPath == file.get("path") True """ cont: DjmdContent if isinstance(content, (int, str)): cont = self.get_content(ID=content) else: cont = content old_path = Path(cont.FolderPath) ext = old_path.suffix new_path = old_path.parent / name new_path = new_path.with_suffix(ext) self.update_content_path(cont, new_path, save, check_path, commit=commit)
[docs] def to_dict(self, verbose: bool = False) -> Dict[str, Any]: """Convert the database to a dictionary. Parameters ---------- verbose: bool, optional If True, print the name of the table that is currently converted. Returns ------- dict A dictionary containing the database tables as keys and the table data as a list of dicts. """ data = dict() for table_name in tables.TABLES: if table_name.startswith("Stats") or table_name == "Base": continue if verbose: print(f"Converting table: {table_name}") table = getattr(tables, table_name) columns = table.columns() table_data = list() for row in self.query(table).all(): table_data.append({column: row[column] for column in columns}) data[table_name] = table_data return data
[docs] def to_json( self, file: PathLike, indent: int = 4, sort_keys: bool = True, verbose: bool = False ) -> None: """Convert the database to a JSON file.""" import json def json_serial(obj: Any) -> Any: if isinstance(obj, (datetime.datetime, datetime.date)): return obj.isoformat() raise TypeError(f"Type {type(obj)} not serializable") data = self.to_dict(verbose=verbose) with open(file, "w") as fp: json.dump(data, fp, indent=indent, sort_keys=sort_keys, default=json_serial)
[docs] def copy_unlocked(self, output_file: PathLike) -> None: src_engine = self.engine src_metadata = MetaData() exclude_tables = ("sqlite_master", "sqlite_sequence", "sqlite_temp_master") dst_engine = create_engine(f"sqlite:///{output_file}") dst_metadata = MetaData() @event.listens_for(src_metadata, "column_reflect") def genericize_datatypes(inspector, tablename, column_dict): # type: ignore # noqa: ANN202 type_ = column_dict["type"].as_generic(allow_nulltype=True) if isinstance(type_, DateTime): type_ = String column_dict["type"] = type_ src_conn = src_engine.connect() dst_conn = dst_engine.connect() dst_metadata.reflect(bind=dst_engine) # drop all tables in target database for table in reversed(dst_metadata.sorted_tables): if table.name not in exclude_tables: print("dropping table =", table.name) table.drop(bind=dst_engine) # Delete all data in target database for table in reversed(dst_metadata.sorted_tables): table.delete() dst_metadata.clear() dst_metadata.reflect(bind=dst_engine) src_metadata.reflect(bind=src_engine) # create all tables in target database for table in src_metadata.sorted_tables: if table.name not in exclude_tables: table.create(bind=dst_engine) # refresh metadata before you can copy data dst_metadata.clear() dst_metadata.reflect(bind=dst_engine) # Copy all data from src to target print("Copying data...") string = "\rCopying table {name}: Inserting row {row}" index = 0 for table in dst_metadata.sorted_tables: src_table = src_metadata.tables[table.name] stmt = table.insert() for index, row in enumerate(src_conn.execute(src_table.select())): print(string.format(name=table.name, row=index), end="", flush=True) dst_conn.execute(stmt.values(row)) print(f"\rCopying table {table.name}: Inserted {index} rows", flush=True) dst_conn.commit()