Skip to content

Collection

The collection package contains modules:

  • base_collection: abstraction for Collection
  • base_playlist: abstraction for Playlist
  • base_track: abstraction for Track
  • config: the configuration object for the collection package
  • copy_playlists: copies audio files for tracks within a set of playlists to a new location and writes a new collection with these updated paths
  • helpers: contains helper classes and functions for the other modules of this package
  • playlist_builder: constructs playlists using tags in a Collection and a defined playlist structure in collection_playlists.yaml
  • playlist_filters: abstractions and implementations for playlist filters
  • playlists: abstractions and implementations for playlists
  • rekordbox_collection: implementation of Collection for Rekordbox
  • rekordbox_playlist: implementation of Playlist for Rekordbox
  • rekordbox_track: implementation of Track for Rekordbox
  • shuffle_playlists: writes sequential numbers to tags of shuffled tracks in playlists to emulate playlist shuffling
  • tracks: abstractions and implementations for tracks

RekordboxCollection

Bases: Collection

Collection implementation for usage with Rekordbox.

Source code in djtools/collection/rekordbox_collection.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class RekordboxCollection(Collection):
    "Collection implementation for usage with Rekordbox."

    @make_path
    def __init__(self, path: Path):
        """Deserializes a Collection from an XML file.

        Args:
            path: Path to a serialized collection.
        """
        super().__init__(path=path)
        self._path = path

        # Parse the XML as a BeautifulSoup document.
        with open(self._path, mode="r", encoding="utf-8") as _file:
            self._collection = BeautifulSoup(_file.read(), "xml")

        # Create a dict of tracks.
        self._tracks = {
            track["TrackID"]: RekordboxTrack(track)
            for track in self._collection.find_all("TRACK")
            if track.get("Location")
        }

        # Instantiate the Playlist(s) in this collection.
        self._playlists = RekordboxPlaylist(
            self._collection.find("NODE", {"Name": "ROOT", "Type": "0"}),
            tracks=self._tracks,
        )

    def __repr__(self) -> str:
        """Produce a string representation of this Collection.

        Returns:
            Collection represented as a string.
        """
        # Eventual repr string to return.
        string = "{}({}\n)"
        # Body to the repr string to fill out with Collection content.
        body = ""

        # Dunder methods aren't represented. Public members (i.e methods)
        # aren't represented either.
        repr_attrs = {
            key[1:]: value
            for key, value in self.__dict__.items()
            if not (
                key.startswith(f"_{type(self).__name__}")
                or not key.startswith("_")
                or key == "_collection"
            )
        }

        # Build a representation of this Collection.
        for key, value in repr_attrs.items():
            # Skip representing this collection's playlists and tracks.
            # Defer representation of the playlists attribute until the end.
            if key in ["playlists", "tracks"]:
                continue

            # Represent string values with surrounding double quotes.
            if isinstance(value, (Path, str)):
                value = f'"{value}"'

            # Append the attribute's name and value to the representation.
            body += f"\n{' ' * 4}{key}={value},"

        # Represent the tracks attribute as the number of tracks.
        body += f"\n{' ' * 4}tracks={len(repr_attrs['tracks'])},"

        # Represent the playlists attribute as the total number of playlists.
        stack = list(repr_attrs["playlists"])
        playlist_count = 0
        while stack:
            playlist = stack.pop()
            try:
                stack.extend(playlist.get_playlists())
            except RuntimeError:
                playlist_count += 1
        body += f"\n{' ' * 4}playlists={playlist_count},"

        return string.format(type(self).__name__, body)

    @make_path
    def serialize(self, *args, path: Optional[Path] = None, **kwargs) -> Path:
        """Serializes this Collection as an XML file.

        Args:
            path: Path to output serialized collection to.

        Returns:
            Path to the serialized collection XML file.
        """
        # BeautifulSoup document.
        doc = BeautifulSoup("", features="xml")

        # Tag that contains all the playlist data.
        root_tag_name = "DJ_PLAYLISTS"

        # Retrieve this root tag from the existing document, rather than
        # building it from scratch, in case the attributes ever change.
        root_tag = bs4.Tag(
            name=root_tag_name,
            attrs=self._collection.find(root_tag_name).attrs,
        )

        # Similarly, we want to reference the existing attribute data on the
        # product Tag.
        root_tag.extend(
            [bs4.NavigableString("\n"), copy(self._collection.find("PRODUCT"))]
        )

        # Build the collection Tag and serialize each track into it before
        # adding the collection Tag to the root.
        collection_tag = bs4.Tag(
            name="COLLECTION", attrs={"Entries": str(len(self._tracks))}
        )
        for track in self._tracks.values():
            collection_tag.extend(
                [bs4.NavigableString("\n"), track.serialize()]
            )
        collection_tag.append(bs4.NavigableString("\n"))
        root_tag.extend([bs4.NavigableString("\n"), collection_tag])

        # Build the playlists Tag and serialize each Playlist into it before
        # adding the playlist Tag to the root.
        playlists_tag = bs4.Tag(name="PLAYLISTS")
        playlists_root_tag = bs4.Tag(
            name="NODE",
            attrs={"Type": "0", "Name": "ROOT", "Count": len(self._playlists)},
        )
        for playlist in self._playlists:
            playlists_root_tag.extend(
                [bs4.NavigableString("\n"), playlist.serialize()]
            )
        playlists_root_tag.append(bs4.NavigableString("\n"))
        playlists_tag.extend(
            [
                bs4.NavigableString("\n"),
                playlists_root_tag,
                bs4.NavigableString("\n"),
            ]
        )
        root_tag.extend(
            [
                bs4.NavigableString("\n"),
                playlists_tag,
                bs4.NavigableString("\n"),
            ]
        )
        doc.append(root_tag)

        # If no new path is provided, use the original.
        if not path:
            path = self._path

        # Write the serialized Collection to a new file.
        with open(path, mode="w", encoding="utf-8") as _file:
            _file.write(
                doc.prettify(
                    # UnsortedAttributes formatter ensures attributes are
                    # serialized in the same order as the original XML file.
                    formatter=UnsortedAttributes(
                        indent=2,
                        # CustomSubstitution is used to substitute an expanded
                        # character set in the serialized XML file.
                        entity_substitution=CustomSubstitution.substitute_xml,
                    )
                )
            )

        return path

    @classmethod
    def validate(cls, input_xml: Path, output_xml: Path):
        """Validate the serialized Collection matches the original.

        Args:
            input_xml: Path to an XML containing the original collection.
            output_xml: Path to an XML containing the serialized collection.

        Raises:
            AssertionError: A serialized Collection must exactly match the
                original XML used to deserialize from.
        """
        # Read the original and serialized collection XML files as
        # strings.
        with open(input_xml, mode="r", encoding="utf-8") as _file:
            input_xml_string = _file.read()
        with open(output_xml, mode="r", encoding="utf-8") as _file:
            output_xml_string = _file.read()

        # Rekordbox capitalizes "UTF-8" in the file declaration while
        # BeautifulSoup does not.
        xml_declaration = input_xml_string[:38]
        output_xml_string = xml_declaration + output_xml_string[38:]

        # Replace multiple occurrences of whitespace with a single whitespace.
        whitespace = re.compile(r"/\s{2,}/g")
        input_xml_string = re.sub(whitespace, input_xml_string, " ")
        output_xml_string = re.sub(whitespace, output_xml_string, " ")

        assert (
            input_xml_string == output_xml_string
        ), "Failed RekordboxCollection validation!"

__init__(path)

Deserializes a Collection from an XML file.

Parameters:

Name Type Description Default
path Path

Path to a serialized collection.

required
Source code in djtools/collection/rekordbox_collection.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@make_path
def __init__(self, path: Path):
    """Deserializes a Collection from an XML file.

    Args:
        path: Path to a serialized collection.
    """
    super().__init__(path=path)
    self._path = path

    # Parse the XML as a BeautifulSoup document.
    with open(self._path, mode="r", encoding="utf-8") as _file:
        self._collection = BeautifulSoup(_file.read(), "xml")

    # Create a dict of tracks.
    self._tracks = {
        track["TrackID"]: RekordboxTrack(track)
        for track in self._collection.find_all("TRACK")
        if track.get("Location")
    }

    # Instantiate the Playlist(s) in this collection.
    self._playlists = RekordboxPlaylist(
        self._collection.find("NODE", {"Name": "ROOT", "Type": "0"}),
        tracks=self._tracks,
    )

__repr__()

Produce a string representation of this Collection.

Returns:

Type Description
str

Collection represented as a string.

Source code in djtools/collection/rekordbox_collection.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __repr__(self) -> str:
    """Produce a string representation of this Collection.

    Returns:
        Collection represented as a string.
    """
    # Eventual repr string to return.
    string = "{}({}\n)"
    # Body to the repr string to fill out with Collection content.
    body = ""

    # Dunder methods aren't represented. Public members (i.e methods)
    # aren't represented either.
    repr_attrs = {
        key[1:]: value
        for key, value in self.__dict__.items()
        if not (
            key.startswith(f"_{type(self).__name__}")
            or not key.startswith("_")
            or key == "_collection"
        )
    }

    # Build a representation of this Collection.
    for key, value in repr_attrs.items():
        # Skip representing this collection's playlists and tracks.
        # Defer representation of the playlists attribute until the end.
        if key in ["playlists", "tracks"]:
            continue

        # Represent string values with surrounding double quotes.
        if isinstance(value, (Path, str)):
            value = f'"{value}"'

        # Append the attribute's name and value to the representation.
        body += f"\n{' ' * 4}{key}={value},"

    # Represent the tracks attribute as the number of tracks.
    body += f"\n{' ' * 4}tracks={len(repr_attrs['tracks'])},"

    # Represent the playlists attribute as the total number of playlists.
    stack = list(repr_attrs["playlists"])
    playlist_count = 0
    while stack:
        playlist = stack.pop()
        try:
            stack.extend(playlist.get_playlists())
        except RuntimeError:
            playlist_count += 1
    body += f"\n{' ' * 4}playlists={playlist_count},"

    return string.format(type(self).__name__, body)

serialize(*args, path=None, **kwargs)

Serializes this Collection as an XML file.

Parameters:

Name Type Description Default
path Optional[Path]

Path to output serialized collection to.

None

Returns:

Type Description
Path

Path to the serialized collection XML file.

Source code in djtools/collection/rekordbox_collection.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@make_path
def serialize(self, *args, path: Optional[Path] = None, **kwargs) -> Path:
    """Serializes this Collection as an XML file.

    Args:
        path: Path to output serialized collection to.

    Returns:
        Path to the serialized collection XML file.
    """
    # BeautifulSoup document.
    doc = BeautifulSoup("", features="xml")

    # Tag that contains all the playlist data.
    root_tag_name = "DJ_PLAYLISTS"

    # Retrieve this root tag from the existing document, rather than
    # building it from scratch, in case the attributes ever change.
    root_tag = bs4.Tag(
        name=root_tag_name,
        attrs=self._collection.find(root_tag_name).attrs,
    )

    # Similarly, we want to reference the existing attribute data on the
    # product Tag.
    root_tag.extend(
        [bs4.NavigableString("\n"), copy(self._collection.find("PRODUCT"))]
    )

    # Build the collection Tag and serialize each track into it before
    # adding the collection Tag to the root.
    collection_tag = bs4.Tag(
        name="COLLECTION", attrs={"Entries": str(len(self._tracks))}
    )
    for track in self._tracks.values():
        collection_tag.extend(
            [bs4.NavigableString("\n"), track.serialize()]
        )
    collection_tag.append(bs4.NavigableString("\n"))
    root_tag.extend([bs4.NavigableString("\n"), collection_tag])

    # Build the playlists Tag and serialize each Playlist into it before
    # adding the playlist Tag to the root.
    playlists_tag = bs4.Tag(name="PLAYLISTS")
    playlists_root_tag = bs4.Tag(
        name="NODE",
        attrs={"Type": "0", "Name": "ROOT", "Count": len(self._playlists)},
    )
    for playlist in self._playlists:
        playlists_root_tag.extend(
            [bs4.NavigableString("\n"), playlist.serialize()]
        )
    playlists_root_tag.append(bs4.NavigableString("\n"))
    playlists_tag.extend(
        [
            bs4.NavigableString("\n"),
            playlists_root_tag,
            bs4.NavigableString("\n"),
        ]
    )
    root_tag.extend(
        [
            bs4.NavigableString("\n"),
            playlists_tag,
            bs4.NavigableString("\n"),
        ]
    )
    doc.append(root_tag)

    # If no new path is provided, use the original.
    if not path:
        path = self._path

    # Write the serialized Collection to a new file.
    with open(path, mode="w", encoding="utf-8") as _file:
        _file.write(
            doc.prettify(
                # UnsortedAttributes formatter ensures attributes are
                # serialized in the same order as the original XML file.
                formatter=UnsortedAttributes(
                    indent=2,
                    # CustomSubstitution is used to substitute an expanded
                    # character set in the serialized XML file.
                    entity_substitution=CustomSubstitution.substitute_xml,
                )
            )
        )

    return path

validate(input_xml, output_xml) classmethod

Validate the serialized Collection matches the original.

Parameters:

Name Type Description Default
input_xml Path

Path to an XML containing the original collection.

required
output_xml Path

Path to an XML containing the serialized collection.

required

Raises:

Type Description
AssertionError

A serialized Collection must exactly match the original XML used to deserialize from.

Source code in djtools/collection/rekordbox_collection.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@classmethod
def validate(cls, input_xml: Path, output_xml: Path):
    """Validate the serialized Collection matches the original.

    Args:
        input_xml: Path to an XML containing the original collection.
        output_xml: Path to an XML containing the serialized collection.

    Raises:
        AssertionError: A serialized Collection must exactly match the
            original XML used to deserialize from.
    """
    # Read the original and serialized collection XML files as
    # strings.
    with open(input_xml, mode="r", encoding="utf-8") as _file:
        input_xml_string = _file.read()
    with open(output_xml, mode="r", encoding="utf-8") as _file:
        output_xml_string = _file.read()

    # Rekordbox capitalizes "UTF-8" in the file declaration while
    # BeautifulSoup does not.
    xml_declaration = input_xml_string[:38]
    output_xml_string = xml_declaration + output_xml_string[38:]

    # Replace multiple occurrences of whitespace with a single whitespace.
    whitespace = re.compile(r"/\s{2,}/g")
    input_xml_string = re.sub(whitespace, input_xml_string, " ")
    output_xml_string = re.sub(whitespace, output_xml_string, " ")

    assert (
        input_xml_string == output_xml_string
    ), "Failed RekordboxCollection validation!"

RekordboxPlaylist

Bases: Playlist

Playlist implementation for usage with Rekordbox.

Source code in djtools/collection/rekordbox_playlist.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class RekordboxPlaylist(Playlist):
    "Playlist implementation for usage with Rekordbox."

    def __init__(
        self,
        playlist: bs4.element.Tag,
        *args,
        tracks: Dict[str, RekordboxTrack] = None,
        playlist_tracks: Optional[Dict[str, RekordboxTrack]] = None,
        parent: Optional["RekordboxPlaylist"] = None,
        **kwargs,
    ):
        """Deserialize a Playlist from a BeautifulSoup NODE Tag.

        Args:
            playlist: BeautifulSoup Tag representing a playlist.
            tracks: All the tracks in this collection.
            playlist_tracks: Tracks to set when initializing with new_playlist.
            parent: The folder this playlist is in.
        """
        super().__init__(*args, **kwargs)
        self._tracks = None
        self._playlists = None
        self._parent = parent
        tracks = tracks or {}

        # Set this object's attributes with the NODE Tag's attributes.
        for key, value in playlist.attrs.items():
            setattr(self, f"_{key}", value)

        # Recursively instantiate sub-playlists.
        if self.is_folder():
            self._playlists = [
                RekordboxPlaylist(playlist, tracks=tracks, parent=self)
                for playlist in filter(
                    lambda x: isinstance(x, bs4.element.Tag), playlist.children
                )
            ]
        # Deserialize tracks from a leaf node playlist.
        else:
            # Filter the children elements for Tags and get the key attribute.
            if not playlist_tracks:
                playlist_tracks = [
                    track.get("Key")
                    for track in filter(
                        lambda x: isinstance(x, bs4.element.Tag),
                        playlist.children,
                    )
                ]
            # Create a dict of tracks.
            self._tracks = {
                track_id: tracks[track_id] for track_id in playlist_tracks
            }

    def __repr__(self) -> str:
        """Produces a string representation of this playlist.

        Returns:
            Playlist represented as a string.
        """
        # Eventual repr string to return.
        string = "{}{}({}{})"
        # Body of the repr string to fill out with playlist contents.
        body = ""
        # Get the repr recursion depth to determine the degree of indentation.
        depth = (
            len(
                [
                    frame
                    for frame in inspect.stack()
                    if frame[3] == "__repr__"
                    and Path(frame[1]).name == "rekordbox_playlist.py"
                ]
            )
            - 1
        )
        # These variables are used to control indentation level.
        extra = 1 if depth else 0
        padding = f"{' ' * 4 * (depth + extra)}"

        # Dunder members aren't represented. Public members (i.e. methods)
        # aren't represented either.
        repr_attrs = {
            key[1:]: value
            for key, value in self.__dict__.items()
            if not (
                key.startswith(f"_{type(self).__name__}")
                or not key.startswith("_")
                or key == "_parent"
                or key == "_aggregate"
            )
        }

        # Build a representation of this playlist.
        for key, value in repr_attrs.items():
            # Skip representing this playlist's tracks.
            # Defer representation of the playlists attribute until the end.
            if key in ["playlists", "tracks"]:
                continue

            # Represent string values with surrounding double quotes.
            if isinstance(value, str):
                value = f'"{value}"'

            # Append the attribute's name and value to the representation.
            body += f"{key}={value}, "

        # Truncate the final attributes trailing ", ".
        if not repr_attrs["playlists"]:
            body = body[:-2]
        else:
            body = body[:-1]

        # Now represent the playlist attribute as an indented list of
        # sub-playlists.
        for key, value in repr_attrs.items():
            if key != "playlists" or value is None:
                continue
            body += f"\n{padding + ' ' * 4 * (depth or 1)}{key}=["
            for val in value:
                body += f"\n{' ' * 4 * depth}{repr(val)},"
            # Truncate final comma.
            body = body[:-1]
            body += f"\n{padding + ' ' * 4 * (depth or 1)}],"

        # Truncate final comma.
        if repr_attrs["playlists"]:
            body = body[:-1]

        return string.format(
            padding if depth else "",
            type(self).__name__,
            body,
            f"\n{padding}{' ' * 4 * (depth - 1)}" if self._playlists else "",
        )

    def __str__(self) -> str:
        """Produce a string representation of this playlist.

        Returns:
            Playlist represented as a string.
        """
        return str(self.serialize())

    def get_name(self) -> str:
        """Returns the name of this playlist.

        Returns:
            The name of this playlist.
        """
        return self._Name  # pylint: disable=no-member

    def is_folder(self) -> bool:
        """Returns whether this playlist is a folder or a playlist of tracks.

        Returns:
            Boolean representing whether this is a folder or not.
        """
        return self._Type == "0"  # pylint: disable=no-member

    @classmethod
    def new_playlist(
        cls,
        name: str,
        playlists: Optional[List["RekordboxPlaylist"]] = None,
        tracks: Optional[Dict[str, RekordboxTrack]] = None,
        enable_aggregation: Optional[bool] = None,
    ) -> "RekordboxPlaylist":
        """Creates a new playlist.

        Args:
            name: The name of the Playlist to be created.
            playlists: A list of Playlists to add to this Playlist.
            tracks: A dict of Tracks to add to this Playlist.
            enable_aggregation: Whether or not this playlist has an aggregation
                playlist.

        Raises:
            RuntimeError: You must provide either a list of Playlists or a list
                of Tracks.
            RuntimeError: You must not provide both a list of Playlists and a
                list of Tracks.

        Returns:
            A new playlist.
        """
        if playlists is None and tracks is None:
            raise RuntimeError(
                "You must provide either a list of RekordboxPlaylists or a "
                "list of RekordboxTracks"
            )
        if playlists is not None and tracks is not None:
            raise RuntimeError(
                "You must not provide both a list of RekordboxPlaylists and a "
                "list of RekordboxTracks"
            )
        playlist_tag = bs4.Tag(
            name="NODE",
            attrs=(
                {"Name": name, "Type": "0", "Count": len(playlists)}
                if playlists is not None
                else {
                    "Name": name,
                    "Type": "1",
                    "KeyType": "0",
                    "Entries": len(tracks),
                }
            ),
        )
        playlist = RekordboxPlaylist(
            playlist_tag,
            tracks=tracks,
            playlist_tracks=(tracks or {}).keys(),
            enable_aggregation=enable_aggregation,
        )
        playlist._playlists = playlists

        return playlist

    def serialize(self, *args, **kwargs) -> bs4.element.Tag:
        """Serializes this playlist as a BeautifulSoup NODE Tag.

        Returns:
            BeautifulSoup Tag representing this playlist.
        """
        # BeautifulSoup Tag to populate with attributes of this playlist.
        playlist_tag = bs4.Tag(name="NODE", can_be_empty_element=True)

        # Dunder members aren't serialized. Public members (i.e. methods)
        # aren't serialized either.
        serialize_attrs = {
            key[1:]: value
            for key, value in self.__dict__.items()
            if not (
                key.startswith(f"_{type(self).__name__}")
                or not key.startswith("_")
                or key == "_parent"
                or key == "_aggregate"
            )
        }

        # Serialize attributes into a NODE Tag.
        for key, value in serialize_attrs.items():
            # Playlists and tracks are serialized as nested Tag objects.
            if key in ["playlists", "tracks"]:
                if not value:
                    continue

                # Iterate and serialize nested playlists.
                if self.is_folder():
                    for val in value:
                        playlist_tag.extend(
                            [bs4.NavigableString("\n"), val.serialize()]
                        )
                # Iterate and serialize tracks.
                else:
                    for val in value.values():
                        playlist_tag.extend(
                            [
                                bs4.NavigableString("\n"),
                                val.serialize(playlist=True),
                            ]
                        )

                # Append a final newline character.
                playlist_tag.append(bs4.NavigableString("\n"))
                continue

            # Otherwise the data is serialized as NODE Tag attributes.
            playlist_tag[key] = value

        # Update the Count or Entries attribute.
        playlist_tag["Count" if self.is_folder() else "Entries"] = str(
            len(self)
        )

        return playlist_tag

__init__(playlist, *args, tracks=None, playlist_tracks=None, parent=None, **kwargs)

Deserialize a Playlist from a BeautifulSoup NODE Tag.

Parameters:

Name Type Description Default
playlist Tag

BeautifulSoup Tag representing a playlist.

required
tracks Dict[str, RekordboxTrack]

All the tracks in this collection.

None
playlist_tracks Optional[Dict[str, RekordboxTrack]]

Tracks to set when initializing with new_playlist.

None
parent Optional[RekordboxPlaylist]

The folder this playlist is in.

None
Source code in djtools/collection/rekordbox_playlist.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    playlist: bs4.element.Tag,
    *args,
    tracks: Dict[str, RekordboxTrack] = None,
    playlist_tracks: Optional[Dict[str, RekordboxTrack]] = None,
    parent: Optional["RekordboxPlaylist"] = None,
    **kwargs,
):
    """Deserialize a Playlist from a BeautifulSoup NODE Tag.

    Args:
        playlist: BeautifulSoup Tag representing a playlist.
        tracks: All the tracks in this collection.
        playlist_tracks: Tracks to set when initializing with new_playlist.
        parent: The folder this playlist is in.
    """
    super().__init__(*args, **kwargs)
    self._tracks = None
    self._playlists = None
    self._parent = parent
    tracks = tracks or {}

    # Set this object's attributes with the NODE Tag's attributes.
    for key, value in playlist.attrs.items():
        setattr(self, f"_{key}", value)

    # Recursively instantiate sub-playlists.
    if self.is_folder():
        self._playlists = [
            RekordboxPlaylist(playlist, tracks=tracks, parent=self)
            for playlist in filter(
                lambda x: isinstance(x, bs4.element.Tag), playlist.children
            )
        ]
    # Deserialize tracks from a leaf node playlist.
    else:
        # Filter the children elements for Tags and get the key attribute.
        if not playlist_tracks:
            playlist_tracks = [
                track.get("Key")
                for track in filter(
                    lambda x: isinstance(x, bs4.element.Tag),
                    playlist.children,
                )
            ]
        # Create a dict of tracks.
        self._tracks = {
            track_id: tracks[track_id] for track_id in playlist_tracks
        }

__repr__()

Produces a string representation of this playlist.

Returns:

Type Description
str

Playlist represented as a string.

Source code in djtools/collection/rekordbox_playlist.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __repr__(self) -> str:
    """Produces a string representation of this playlist.

    Returns:
        Playlist represented as a string.
    """
    # Eventual repr string to return.
    string = "{}{}({}{})"
    # Body of the repr string to fill out with playlist contents.
    body = ""
    # Get the repr recursion depth to determine the degree of indentation.
    depth = (
        len(
            [
                frame
                for frame in inspect.stack()
                if frame[3] == "__repr__"
                and Path(frame[1]).name == "rekordbox_playlist.py"
            ]
        )
        - 1
    )
    # These variables are used to control indentation level.
    extra = 1 if depth else 0
    padding = f"{' ' * 4 * (depth + extra)}"

    # Dunder members aren't represented. Public members (i.e. methods)
    # aren't represented either.
    repr_attrs = {
        key[1:]: value
        for key, value in self.__dict__.items()
        if not (
            key.startswith(f"_{type(self).__name__}")
            or not key.startswith("_")
            or key == "_parent"
            or key == "_aggregate"
        )
    }

    # Build a representation of this playlist.
    for key, value in repr_attrs.items():
        # Skip representing this playlist's tracks.
        # Defer representation of the playlists attribute until the end.
        if key in ["playlists", "tracks"]:
            continue

        # Represent string values with surrounding double quotes.
        if isinstance(value, str):
            value = f'"{value}"'

        # Append the attribute's name and value to the representation.
        body += f"{key}={value}, "

    # Truncate the final attributes trailing ", ".
    if not repr_attrs["playlists"]:
        body = body[:-2]
    else:
        body = body[:-1]

    # Now represent the playlist attribute as an indented list of
    # sub-playlists.
    for key, value in repr_attrs.items():
        if key != "playlists" or value is None:
            continue
        body += f"\n{padding + ' ' * 4 * (depth or 1)}{key}=["
        for val in value:
            body += f"\n{' ' * 4 * depth}{repr(val)},"
        # Truncate final comma.
        body = body[:-1]
        body += f"\n{padding + ' ' * 4 * (depth or 1)}],"

    # Truncate final comma.
    if repr_attrs["playlists"]:
        body = body[:-1]

    return string.format(
        padding if depth else "",
        type(self).__name__,
        body,
        f"\n{padding}{' ' * 4 * (depth - 1)}" if self._playlists else "",
    )

__str__()

Produce a string representation of this playlist.

Returns:

Type Description
str

Playlist represented as a string.

Source code in djtools/collection/rekordbox_playlist.py
156
157
158
159
160
161
162
def __str__(self) -> str:
    """Produce a string representation of this playlist.

    Returns:
        Playlist represented as a string.
    """
    return str(self.serialize())

get_name()

Returns the name of this playlist.

Returns:

Type Description
str

The name of this playlist.

Source code in djtools/collection/rekordbox_playlist.py
164
165
166
167
168
169
170
def get_name(self) -> str:
    """Returns the name of this playlist.

    Returns:
        The name of this playlist.
    """
    return self._Name  # pylint: disable=no-member

is_folder()

Returns whether this playlist is a folder or a playlist of tracks.

Returns:

Type Description
bool

Boolean representing whether this is a folder or not.

Source code in djtools/collection/rekordbox_playlist.py
172
173
174
175
176
177
178
def is_folder(self) -> bool:
    """Returns whether this playlist is a folder or a playlist of tracks.

    Returns:
        Boolean representing whether this is a folder or not.
    """
    return self._Type == "0"  # pylint: disable=no-member

new_playlist(name, playlists=None, tracks=None, enable_aggregation=None) classmethod

Creates a new playlist.

Parameters:

Name Type Description Default
name str

The name of the Playlist to be created.

required
playlists Optional[List[RekordboxPlaylist]]

A list of Playlists to add to this Playlist.

None
tracks Optional[Dict[str, RekordboxTrack]]

A dict of Tracks to add to this Playlist.

None
enable_aggregation Optional[bool]

Whether or not this playlist has an aggregation playlist.

None

Raises:

Type Description
RuntimeError

You must provide either a list of Playlists or a list of Tracks.

RuntimeError

You must not provide both a list of Playlists and a list of Tracks.

Returns:

Type Description
RekordboxPlaylist

A new playlist.

Source code in djtools/collection/rekordbox_playlist.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@classmethod
def new_playlist(
    cls,
    name: str,
    playlists: Optional[List["RekordboxPlaylist"]] = None,
    tracks: Optional[Dict[str, RekordboxTrack]] = None,
    enable_aggregation: Optional[bool] = None,
) -> "RekordboxPlaylist":
    """Creates a new playlist.

    Args:
        name: The name of the Playlist to be created.
        playlists: A list of Playlists to add to this Playlist.
        tracks: A dict of Tracks to add to this Playlist.
        enable_aggregation: Whether or not this playlist has an aggregation
            playlist.

    Raises:
        RuntimeError: You must provide either a list of Playlists or a list
            of Tracks.
        RuntimeError: You must not provide both a list of Playlists and a
            list of Tracks.

    Returns:
        A new playlist.
    """
    if playlists is None and tracks is None:
        raise RuntimeError(
            "You must provide either a list of RekordboxPlaylists or a "
            "list of RekordboxTracks"
        )
    if playlists is not None and tracks is not None:
        raise RuntimeError(
            "You must not provide both a list of RekordboxPlaylists and a "
            "list of RekordboxTracks"
        )
    playlist_tag = bs4.Tag(
        name="NODE",
        attrs=(
            {"Name": name, "Type": "0", "Count": len(playlists)}
            if playlists is not None
            else {
                "Name": name,
                "Type": "1",
                "KeyType": "0",
                "Entries": len(tracks),
            }
        ),
    )
    playlist = RekordboxPlaylist(
        playlist_tag,
        tracks=tracks,
        playlist_tracks=(tracks or {}).keys(),
        enable_aggregation=enable_aggregation,
    )
    playlist._playlists = playlists

    return playlist

serialize(*args, **kwargs)

Serializes this playlist as a BeautifulSoup NODE Tag.

Returns:

Type Description
Tag

BeautifulSoup Tag representing this playlist.

Source code in djtools/collection/rekordbox_playlist.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def serialize(self, *args, **kwargs) -> bs4.element.Tag:
    """Serializes this playlist as a BeautifulSoup NODE Tag.

    Returns:
        BeautifulSoup Tag representing this playlist.
    """
    # BeautifulSoup Tag to populate with attributes of this playlist.
    playlist_tag = bs4.Tag(name="NODE", can_be_empty_element=True)

    # Dunder members aren't serialized. Public members (i.e. methods)
    # aren't serialized either.
    serialize_attrs = {
        key[1:]: value
        for key, value in self.__dict__.items()
        if not (
            key.startswith(f"_{type(self).__name__}")
            or not key.startswith("_")
            or key == "_parent"
            or key == "_aggregate"
        )
    }

    # Serialize attributes into a NODE Tag.
    for key, value in serialize_attrs.items():
        # Playlists and tracks are serialized as nested Tag objects.
        if key in ["playlists", "tracks"]:
            if not value:
                continue

            # Iterate and serialize nested playlists.
            if self.is_folder():
                for val in value:
                    playlist_tag.extend(
                        [bs4.NavigableString("\n"), val.serialize()]
                    )
            # Iterate and serialize tracks.
            else:
                for val in value.values():
                    playlist_tag.extend(
                        [
                            bs4.NavigableString("\n"),
                            val.serialize(playlist=True),
                        ]
                    )

            # Append a final newline character.
            playlist_tag.append(bs4.NavigableString("\n"))
            continue

        # Otherwise the data is serialized as NODE Tag attributes.
        playlist_tag[key] = value

    # Update the Count or Entries attribute.
    playlist_tag["Count" if self.is_folder() else "Entries"] = str(
        len(self)
    )

    return playlist_tag

RekordboxTrack

Bases: Track

Track implementation for usage with Rekordbox.

Source code in djtools/collection/rekordbox_track.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
class RekordboxTrack(Track):
    "Track implementation for usage with Rekordbox."

    def __init__(self, track: bs4.element.Tag):
        """Deserialize a track from a BeautifulSoup TRACK Tag.

        Args:
            track: BeautifulSoup Tag representing a track.
        """
        # Prefix of the path to the audio file corresponding to this track.
        super().__init__()
        self.__location_prefix = (
            "file://localhost" if os.name == "posix" else "file://localhost/"
        )

        # Set class attributes from TRACK Tag attributes.
        for key, value in track.attrs.items():
            if key in [
                "BitRate",
                "DiscNumber",
                "PlayCount",
                "SampleRate",
                "Size",
                "TotalTime",
                "TrackNumber",
            ]:
                value = int(value)
            if key == "AverageBpm":
                value = float(value)
            if key == "DateAdded":
                # We need to keep the original date added string because
                # Rekordbox doesn't format date strings consistently i.e.
                # ensuring perfect serialization symmetry is not possible
                # without this.
                self.__original_date_added = value
                value = datetime.strptime(value, "%Y-%m-%d")
            if key == "Genre":
                value = [x.strip() for x in value.split("/")]
            if key == "Location":
                value = Path(unquote(value).split(self.__location_prefix)[-1])
            if key == "Rating":
                value = {
                    "0": 0,
                    "51": 1,
                    "102": 2,
                    "153": 3,
                    "204": 4,
                    "255": 5,
                }.get(value)
            setattr(self, f"_{key}", value)

        # Parse MyTag data from Comments attribute.
        my_tags = re.search(r"(?<=\/\*).*(?=\*\/)", self._Comments)
        self._MyTags = (  # pylint: disable=invalid-name
            [x.strip() for x in my_tags.group().split("/")] if my_tags else []
        )

        # Merge Genre and MyTag data into a new attribute.
        self._Tags = self._Genre + self._MyTags  # pylint: disable=invalid-name

        # Parse TEMPO Tags as the beat grid attribute.
        self._beat_grid = track.find_all("TEMPO")
        if self._beat_grid:
            self._beat_grid = [point.attrs for point in self._beat_grid]

        # Parse POSITION_MARK Tags as the hot cues attribute.
        self._hot_cues = track.find_all("POSITION_MARK")
        if self._hot_cues:
            self._hot_cues = [hot_cue.attrs for hot_cue in self._hot_cues]

    def __repr__(self) -> str:
        """Produces a string representation of this track.

        Returns:
            Track represented as a string.
        """
        # Enforce a maximum width for a Track representation.
        max_width = 79
        # Eventual repr string to return.
        string = "{}(\n{}\n)"
        # Body of the repr string to fill out with track contents.
        body = " " * 4

        # Dunder members aren't represented. Public members (i.e. methods)
        # aren't represented either.
        repr_attrs = {
            key[1:]: value
            for key, value in self.__dict__.items()
            if not (
                key.startswith(f"_{type(self).__name__}")
                or not key.startswith("_")
            )
        }

        # Build a representation of this track.
        for key, value in repr_attrs.items():
            # Prettify output by enforcing `max_width`.
            if len(body.split("\n", maxsplit=-1)[-1]) > max_width:
                body += f"\n{' ' * 4}"

            # Rather than display each beat grid or hot cue attribute, simply
            # represent as the number of those attributes.
            if key in ["beat_grid", "hot_cues"]:
                value = len(value)

            # Represent string values with surrounding double quotes.
            if isinstance(value, str):
                value = f'"{value}"'

            # Truncate the HH:MM:SS part of the datetime.
            if isinstance(value, datetime):
                value = f'"{value.strftime("%Y-%m-%d")}"'

            # Append the attribute's name and value to the representation.
            body += f"{key}={value}, "

        # Truncate final comma and space.
        body = body[:-2]

        return string.format(type(self).__name__, body)

    def __str__(self) -> str:
        """Produces a string representation of this track.

        Returns:
            Track represented as a string.
        """
        return str(self.serialize())

    def get_artists(self) -> str:
        """Gets the track artists.

        Returns:
            A string representing the track's artists.
        """
        return self._Artist

    def get_bpm(self) -> float:
        """Gets the track BPM.

        Returns:
            A float representing BPM.
        """
        return self._AverageBpm

    def get_comments(self) -> str:
        """Gets the track comments.

        Returns:
            A string representing the track's comments.
        """
        return self._Comments

    def get_date_added(self) -> str:
        """Gets the track's date added.

        Returns:
            A datetime representing the track's date added.
        """
        return self._DateAdded

    def get_genre_tags(self) -> List[str]:
        """Gets the genre tags of the track.

        Returns:
            A list of the track's genre tags.
        """
        return self._Genre

    def get_id(self) -> str:
        """Get the track ID.

        Returns:
            The ID of this track.
        """
        return self._TrackID

    def get_key(self) -> Any:
        """Gets the track key.

        Returns:
            The key of this track.
        """
        return self._Tonality

    def get_label(self) -> Any:
        """Gets the track label.

        Returns:
            The label of this track.
        """
        return self._Label

    def get_location(self) -> Path:
        """Gets the location of the track.

        Returns:
            The Path for the location of the track.
        """
        return self._Location

    def get_rating(self) -> int:
        """Gets the rating of the track.

        Returns:
            The rating of the track.
        """
        return self._Rating

    def get_tags(self) -> List[str]:
        """Gets the tags of the track.

        Returns:
            A set of the track's tags.
        """
        return self._Tags

    def get_year(self) -> str:
        """Gets the year of the track.

        Returns:
            The year of the track.
        """
        return self._Year

    def serialize(
        self, *args, playlist: bool = False, **kwargs
    ) -> bs4.element.Tag:
        """Serializes this track as a BeautifulSoup TRACK Tag.

        Args:
            playlist: Whether or not to serialize this track as a member of a
                playlist.

        Raises:
            ValueError: The DateAdded attribute must serialize into its
                original format.

        Returns:
            BeautifulSoup Tag representing this track.
        """
        # BeautifulSoup Tag to populate with attributes of this track.
        track_tag = bs4.Tag(name="TRACK", can_be_empty_element=True)

        # TRACK Tags in playlists are serialized differently from top-level
        # TRACK Tags.
        if playlist:
            track_tag["Key"] = self.get_id()

            return track_tag

        # Dunder members aren't serialized. Public members (i.e. methods)
        # aren't serialized either.
        serialize_attrs = {
            key[1:]: value
            for key, value in self.__dict__.items()
            if not (
                key.startswith(f"_{type(self).__name__}")
                or not key.startswith("_")
                or key in ["_MyTags", "_Tags"]
            )
        }

        # Serialize attributes into a TRACK Tag.
        for key, value in serialize_attrs.items():
            # Beat grid and hot cue data is serialized as TEMPO and
            # POSITION_MARK Tags, respectively.
            if key in ["beat_grid", "hot_cues"]:
                for val in value:
                    tag = bs4.Tag(
                        name="POSITION_MARK" if key == "hot_cues" else "TEMPO",
                        can_be_empty_element=True,
                    )
                    tag.attrs = val
                    track_tag.extend([bs4.NavigableString("\n"), tag])
                continue

            # Cast integers back into a string.
            if key in [
                "BitRate",
                "DiscNumber",
                "PlayCount",
                "SampleRate",
                "Size",
                "TotalTime",
                "TrackNumber",
            ]:
                value = str(value)

            # Increase BPM precision to make serialization 100% symmetrical.
            if key == "AverageBpm":
                value = f"{value:0,.2f}"

            # Truncate the HH:MM:SS part of the datetime.
            if isinstance(value, datetime):
                # Rekordbox doesn't consistently format dates with or without
                # leading zeros on the month and day portion of the date
                # string, so we have to try all three of these formats to see
                # if the resulting formatting matches the original one.
                date_formats = [
                    "%Y-%m-%d",
                    ("%Y-%-m-%d" if os.name == "posix" else "%Y-%#m-%d"),
                    ("%Y-%-m-%-d" if os.name == "posix" else "%Y-%#m-%#d"),
                ]
                for date_format in date_formats:
                    attempt = value.strftime(date_format)
                    if attempt == self.__original_date_added:
                        break
                value = attempt

                if value != self.__original_date_added:
                    raise ValueError(  # pragma: no cover
                        f"Failed to serialize the datetime {value} into its "
                        f"original format {self.__original_date_added}"
                    )

            # Re-join genre tags with forward slashes.
            if key == "Genre":
                value = " / ".join(value)

            # Re-insert the location prefix and quote the path.
            if key == "Location":
                track_path = quote(value.as_posix(), safe="/,()!+=#;$:")
                value = f"{self.__location_prefix}{track_path}"
                value = re.sub(
                    r"%[0-9A-Z]{2}", lambda x: x.group(0).lower(), value
                )

            # Reverse the rating value to the range recognized by Rekordbox.
            if key == "Rating":
                value = {
                    0: "0",
                    1: "51",
                    2: "102",
                    3: "153",
                    4: "204",
                    5: "255",
                }.get(value)

            # Otherwise the data is serialized as TRACK Tag attributes.
            track_tag[key] = value

        # If this TRACK Tag has children, append a final newline character.
        if len(track_tag) > 1:
            track_tag.append(bs4.NavigableString("\n"))

        return track_tag

    @make_path
    def set_location(self, location: Path):
        """Sets the path of the track to location.

        Args:
            location: New location of the track.
        """
        self._Location = location  # pylint: disable=attribute-defined-outside-init,invalid-name

    def set_track_number(self, number: int):
        """Sets the track number of a track.

        Args:
            number: Number to set for TrackNumber.
        """
        self._TrackNumber = number  # pylint: disable=attribute-defined-outside-init,invalid-name

__init__(track)

Deserialize a track from a BeautifulSoup TRACK Tag.

Parameters:

Name Type Description Default
track Tag

BeautifulSoup Tag representing a track.

required
Source code in djtools/collection/rekordbox_track.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, track: bs4.element.Tag):
    """Deserialize a track from a BeautifulSoup TRACK Tag.

    Args:
        track: BeautifulSoup Tag representing a track.
    """
    # Prefix of the path to the audio file corresponding to this track.
    super().__init__()
    self.__location_prefix = (
        "file://localhost" if os.name == "posix" else "file://localhost/"
    )

    # Set class attributes from TRACK Tag attributes.
    for key, value in track.attrs.items():
        if key in [
            "BitRate",
            "DiscNumber",
            "PlayCount",
            "SampleRate",
            "Size",
            "TotalTime",
            "TrackNumber",
        ]:
            value = int(value)
        if key == "AverageBpm":
            value = float(value)
        if key == "DateAdded":
            # We need to keep the original date added string because
            # Rekordbox doesn't format date strings consistently i.e.
            # ensuring perfect serialization symmetry is not possible
            # without this.
            self.__original_date_added = value
            value = datetime.strptime(value, "%Y-%m-%d")
        if key == "Genre":
            value = [x.strip() for x in value.split("/")]
        if key == "Location":
            value = Path(unquote(value).split(self.__location_prefix)[-1])
        if key == "Rating":
            value = {
                "0": 0,
                "51": 1,
                "102": 2,
                "153": 3,
                "204": 4,
                "255": 5,
            }.get(value)
        setattr(self, f"_{key}", value)

    # Parse MyTag data from Comments attribute.
    my_tags = re.search(r"(?<=\/\*).*(?=\*\/)", self._Comments)
    self._MyTags = (  # pylint: disable=invalid-name
        [x.strip() for x in my_tags.group().split("/")] if my_tags else []
    )

    # Merge Genre and MyTag data into a new attribute.
    self._Tags = self._Genre + self._MyTags  # pylint: disable=invalid-name

    # Parse TEMPO Tags as the beat grid attribute.
    self._beat_grid = track.find_all("TEMPO")
    if self._beat_grid:
        self._beat_grid = [point.attrs for point in self._beat_grid]

    # Parse POSITION_MARK Tags as the hot cues attribute.
    self._hot_cues = track.find_all("POSITION_MARK")
    if self._hot_cues:
        self._hot_cues = [hot_cue.attrs for hot_cue in self._hot_cues]

__repr__()

Produces a string representation of this track.

Returns:

Type Description
str

Track represented as a string.

Source code in djtools/collection/rekordbox_track.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def __repr__(self) -> str:
    """Produces a string representation of this track.

    Returns:
        Track represented as a string.
    """
    # Enforce a maximum width for a Track representation.
    max_width = 79
    # Eventual repr string to return.
    string = "{}(\n{}\n)"
    # Body of the repr string to fill out with track contents.
    body = " " * 4

    # Dunder members aren't represented. Public members (i.e. methods)
    # aren't represented either.
    repr_attrs = {
        key[1:]: value
        for key, value in self.__dict__.items()
        if not (
            key.startswith(f"_{type(self).__name__}")
            or not key.startswith("_")
        )
    }

    # Build a representation of this track.
    for key, value in repr_attrs.items():
        # Prettify output by enforcing `max_width`.
        if len(body.split("\n", maxsplit=-1)[-1]) > max_width:
            body += f"\n{' ' * 4}"

        # Rather than display each beat grid or hot cue attribute, simply
        # represent as the number of those attributes.
        if key in ["beat_grid", "hot_cues"]:
            value = len(value)

        # Represent string values with surrounding double quotes.
        if isinstance(value, str):
            value = f'"{value}"'

        # Truncate the HH:MM:SS part of the datetime.
        if isinstance(value, datetime):
            value = f'"{value.strftime("%Y-%m-%d")}"'

        # Append the attribute's name and value to the representation.
        body += f"{key}={value}, "

    # Truncate final comma and space.
    body = body[:-2]

    return string.format(type(self).__name__, body)

__str__()

Produces a string representation of this track.

Returns:

Type Description
str

Track represented as a string.

Source code in djtools/collection/rekordbox_track.py
144
145
146
147
148
149
150
def __str__(self) -> str:
    """Produces a string representation of this track.

    Returns:
        Track represented as a string.
    """
    return str(self.serialize())

get_artists()

Gets the track artists.

Returns:

Type Description
str

A string representing the track's artists.

Source code in djtools/collection/rekordbox_track.py
152
153
154
155
156
157
158
def get_artists(self) -> str:
    """Gets the track artists.

    Returns:
        A string representing the track's artists.
    """
    return self._Artist

get_bpm()

Gets the track BPM.

Returns:

Type Description
float

A float representing BPM.

Source code in djtools/collection/rekordbox_track.py
160
161
162
163
164
165
166
def get_bpm(self) -> float:
    """Gets the track BPM.

    Returns:
        A float representing BPM.
    """
    return self._AverageBpm

get_comments()

Gets the track comments.

Returns:

Type Description
str

A string representing the track's comments.

Source code in djtools/collection/rekordbox_track.py
168
169
170
171
172
173
174
def get_comments(self) -> str:
    """Gets the track comments.

    Returns:
        A string representing the track's comments.
    """
    return self._Comments

get_date_added()

Gets the track's date added.

Returns:

Type Description
str

A datetime representing the track's date added.

Source code in djtools/collection/rekordbox_track.py
176
177
178
179
180
181
182
def get_date_added(self) -> str:
    """Gets the track's date added.

    Returns:
        A datetime representing the track's date added.
    """
    return self._DateAdded

get_genre_tags()

Gets the genre tags of the track.

Returns:

Type Description
List[str]

A list of the track's genre tags.

Source code in djtools/collection/rekordbox_track.py
184
185
186
187
188
189
190
def get_genre_tags(self) -> List[str]:
    """Gets the genre tags of the track.

    Returns:
        A list of the track's genre tags.
    """
    return self._Genre

get_id()

Get the track ID.

Returns:

Type Description
str

The ID of this track.

Source code in djtools/collection/rekordbox_track.py
192
193
194
195
196
197
198
def get_id(self) -> str:
    """Get the track ID.

    Returns:
        The ID of this track.
    """
    return self._TrackID

get_key()

Gets the track key.

Returns:

Type Description
Any

The key of this track.

Source code in djtools/collection/rekordbox_track.py
200
201
202
203
204
205
206
def get_key(self) -> Any:
    """Gets the track key.

    Returns:
        The key of this track.
    """
    return self._Tonality

get_label()

Gets the track label.

Returns:

Type Description
Any

The label of this track.

Source code in djtools/collection/rekordbox_track.py
208
209
210
211
212
213
214
def get_label(self) -> Any:
    """Gets the track label.

    Returns:
        The label of this track.
    """
    return self._Label

get_location()

Gets the location of the track.

Returns:

Type Description
Path

The Path for the location of the track.

Source code in djtools/collection/rekordbox_track.py
216
217
218
219
220
221
222
def get_location(self) -> Path:
    """Gets the location of the track.

    Returns:
        The Path for the location of the track.
    """
    return self._Location

get_rating()

Gets the rating of the track.

Returns:

Type Description
int

The rating of the track.

Source code in djtools/collection/rekordbox_track.py
224
225
226
227
228
229
230
def get_rating(self) -> int:
    """Gets the rating of the track.

    Returns:
        The rating of the track.
    """
    return self._Rating

get_tags()

Gets the tags of the track.

Returns:

Type Description
List[str]

A set of the track's tags.

Source code in djtools/collection/rekordbox_track.py
232
233
234
235
236
237
238
def get_tags(self) -> List[str]:
    """Gets the tags of the track.

    Returns:
        A set of the track's tags.
    """
    return self._Tags

get_year()

Gets the year of the track.

Returns:

Type Description
str

The year of the track.

Source code in djtools/collection/rekordbox_track.py
240
241
242
243
244
245
246
def get_year(self) -> str:
    """Gets the year of the track.

    Returns:
        The year of the track.
    """
    return self._Year

serialize(*args, playlist=False, **kwargs)

Serializes this track as a BeautifulSoup TRACK Tag.

Parameters:

Name Type Description Default
playlist bool

Whether or not to serialize this track as a member of a playlist.

False

Raises:

Type Description
ValueError

The DateAdded attribute must serialize into its original format.

Returns:

Type Description
Tag

BeautifulSoup Tag representing this track.

Source code in djtools/collection/rekordbox_track.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def serialize(
    self, *args, playlist: bool = False, **kwargs
) -> bs4.element.Tag:
    """Serializes this track as a BeautifulSoup TRACK Tag.

    Args:
        playlist: Whether or not to serialize this track as a member of a
            playlist.

    Raises:
        ValueError: The DateAdded attribute must serialize into its
            original format.

    Returns:
        BeautifulSoup Tag representing this track.
    """
    # BeautifulSoup Tag to populate with attributes of this track.
    track_tag = bs4.Tag(name="TRACK", can_be_empty_element=True)

    # TRACK Tags in playlists are serialized differently from top-level
    # TRACK Tags.
    if playlist:
        track_tag["Key"] = self.get_id()

        return track_tag

    # Dunder members aren't serialized. Public members (i.e. methods)
    # aren't serialized either.
    serialize_attrs = {
        key[1:]: value
        for key, value in self.__dict__.items()
        if not (
            key.startswith(f"_{type(self).__name__}")
            or not key.startswith("_")
            or key in ["_MyTags", "_Tags"]
        )
    }

    # Serialize attributes into a TRACK Tag.
    for key, value in serialize_attrs.items():
        # Beat grid and hot cue data is serialized as TEMPO and
        # POSITION_MARK Tags, respectively.
        if key in ["beat_grid", "hot_cues"]:
            for val in value:
                tag = bs4.Tag(
                    name="POSITION_MARK" if key == "hot_cues" else "TEMPO",
                    can_be_empty_element=True,
                )
                tag.attrs = val
                track_tag.extend([bs4.NavigableString("\n"), tag])
            continue

        # Cast integers back into a string.
        if key in [
            "BitRate",
            "DiscNumber",
            "PlayCount",
            "SampleRate",
            "Size",
            "TotalTime",
            "TrackNumber",
        ]:
            value = str(value)

        # Increase BPM precision to make serialization 100% symmetrical.
        if key == "AverageBpm":
            value = f"{value:0,.2f}"

        # Truncate the HH:MM:SS part of the datetime.
        if isinstance(value, datetime):
            # Rekordbox doesn't consistently format dates with or without
            # leading zeros on the month and day portion of the date
            # string, so we have to try all three of these formats to see
            # if the resulting formatting matches the original one.
            date_formats = [
                "%Y-%m-%d",
                ("%Y-%-m-%d" if os.name == "posix" else "%Y-%#m-%d"),
                ("%Y-%-m-%-d" if os.name == "posix" else "%Y-%#m-%#d"),
            ]
            for date_format in date_formats:
                attempt = value.strftime(date_format)
                if attempt == self.__original_date_added:
                    break
            value = attempt

            if value != self.__original_date_added:
                raise ValueError(  # pragma: no cover
                    f"Failed to serialize the datetime {value} into its "
                    f"original format {self.__original_date_added}"
                )

        # Re-join genre tags with forward slashes.
        if key == "Genre":
            value = " / ".join(value)

        # Re-insert the location prefix and quote the path.
        if key == "Location":
            track_path = quote(value.as_posix(), safe="/,()!+=#;$:")
            value = f"{self.__location_prefix}{track_path}"
            value = re.sub(
                r"%[0-9A-Z]{2}", lambda x: x.group(0).lower(), value
            )

        # Reverse the rating value to the range recognized by Rekordbox.
        if key == "Rating":
            value = {
                0: "0",
                1: "51",
                2: "102",
                3: "153",
                4: "204",
                5: "255",
            }.get(value)

        # Otherwise the data is serialized as TRACK Tag attributes.
        track_tag[key] = value

    # If this TRACK Tag has children, append a final newline character.
    if len(track_tag) > 1:
        track_tag.append(bs4.NavigableString("\n"))

    return track_tag

set_location(location)

Sets the path of the track to location.

Parameters:

Name Type Description Default
location Path

New location of the track.

required
Source code in djtools/collection/rekordbox_track.py
371
372
373
374
375
376
377
378
@make_path
def set_location(self, location: Path):
    """Sets the path of the track to location.

    Args:
        location: New location of the track.
    """
    self._Location = location  # pylint: disable=attribute-defined-outside-init,invalid-name

set_track_number(number)

Sets the track number of a track.

Parameters:

Name Type Description Default
number int

Number to set for TrackNumber.

required
Source code in djtools/collection/rekordbox_track.py
380
381
382
383
384
385
386
def set_track_number(self, number: int):
    """Sets the track number of a track.

    Args:
        number: Number to set for TrackNumber.
    """
    self._TrackNumber = number  # pylint: disable=attribute-defined-outside-init,invalid-name

collection_playlists(config, path=None)

Builds playlists automatically.

By maintaining a collection with tracks having tag data (e.g. genre tags, Rekordbox's "My Tags", etc.) and providing a playlist config which specifies a desired playlist structure based around these tags, users can automatically generate that playlist structure.

The playlist config is a YAML file which specifies a nested structure of folders. Each folder is declared with a "name" and a list of "playlists" which may be either more folder declarations or else strings matching a tag in your collection.

Any folder that has more than one playlist within it will automatically have an "All " playlist added to it that contains the set of tracks from all the other playlists in that folder.

Any tag in your collection that is not specified in the playlist config will automatically be added to either an "Other" folder of playlists or an "Other" playlist (depending on your configured choice for collection_playlists_remainder).

A special folder with the name "_ignore" may be included anywhere within the "tags" specification with playlists matching the set of tags to ignore when creating the "Other" folder / playlist.

In addition to creating playlists from tags, this function also supports creating "combiner" playlists by evaluating boolean algebra expressions. This is an incredibly powerful feature which allows users to apply set operations {union, intersection, and difference} to a diverse range of operands {tag, playlists, BPMs, ratings, etc.}.

Combiner playlists are declared in the "combiner" specification of the playlist config with playlists whose names are the boolean algebra expressions used to construct them.

Here's an example combiner playlist to illustrate this:

((Dubstep ~ [1-3]) | {playlist: My Favorites} | (*Techno & [135-145])) & Dark

The resulting combiner playlist will be comprised of tracks that are:

  • tagged as "Dubstep" but NOT having a rating less than 4
  • OR in the playlist called "My Favorites"
  • OR tagged as something ending with "Techno" AND in the BPM range of 135 to 145
  • AND tagged as "Dark"

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
path Optional[Path]

Path to write the new collection to.

None
Source code in djtools/collection/playlist_builder.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@make_path
def collection_playlists(config: BaseConfig, path: Optional[Path] = None):
    """Builds playlists automatically.

    By maintaining a collection with tracks having tag data (e.g. genre tags,
    Rekordbox's "My Tags", etc.) and providing a playlist config which
    specifies a desired playlist structure based around these tags, users can
    automatically generate that playlist structure.

    The playlist config is a YAML file which specifies a nested structure of
    folders. Each folder is declared with a "name" and a list of "playlists"
    which may be either more folder declarations or else strings matching a tag
    in your collection.

    Any folder that has more than one playlist within it will automatically
    have an "All <folder name>" playlist added to it that contains the set of
    tracks from all the other playlists in that folder.

    Any tag in your collection that is not specified in the playlist config
    will automatically be added to either an "Other" folder of playlists or an
    "Other" playlist (depending on your configured choice for
    collection_playlists_remainder).

    A special folder with the name "_ignore" may be included anywhere within
    the "tags" specification with playlists matching the set of tags to ignore
    when creating the "Other" folder / playlist.

    In addition to creating playlists from tags, this function also supports
    creating "combiner" playlists by evaluating boolean algebra expressions.
    This is an incredibly powerful feature which allows users to apply set
    operations {union, intersection, and difference} to a diverse range of
    operands {tag, playlists, BPMs, ratings, etc.}.

    Combiner playlists are declared in the "combiner" specification of the
    playlist config with playlists whose names are the boolean algebra
    expressions used to construct them.

    Here's an example combiner playlist to illustrate this:

        ((Dubstep ~ [1-3]) | {playlist: My Favorites} | (*Techno & [135-145])) & Dark

    The resulting combiner playlist will be comprised of tracks that are:

    - tagged as "Dubstep" but NOT having a rating less than 4
    - OR in the playlist called "My Favorites"
    - OR tagged as something ending with "Techno" AND in the BPM range of 135
    to 145
    - AND tagged as "Dark"

    Args:
        config: Configuration object.
        path: Path to write the new collection to.
    """
    # Check if the playlist config is populated before continuing.
    if not (
        config.collection.playlist_config.tags
        or config.collection.playlist_config.combiner
    ):
        logger.warning(
            "Not building playlists because the playlist config is empty."
        )
        return

    # Load the collection.
    collection = PLATFORM_REGISTRY[config.collection.platform]["collection"](
        path=config.collection.collection_path
    )

    # Get the Playlist implementation to use for this collection.
    playlist_class = PLATFORM_REGISTRY[config.collection.platform]["playlist"]

    # Required number of tracks to make tag and combiner playlists.
    minimum_tag_tracks = config.collection.minimum_tag_playlist_tracks
    minimum_combiner_tracks = (
        config.collection.minimum_combiner_playlist_tracks
    )

    # Create a dict of tracks keyed by their individual tags.
    tags_tracks = defaultdict(dict)
    for track_id, track in collection.get_tracks().items():
        for tag in track.get_tags():
            tags_tracks[tag][track_id] = track

    # This will hold the playlists being built.
    auto_playlists = []

    # List of PlaylistFilter implementations to run against built playlists.
    filters = [
        getattr(playlist_filters, playlist_filter.value)()
        for playlist_filter in config.collection.collection_playlist_filters
    ]

    # Create playlists for the "tags" portion of the playlist config.
    if config.collection.playlist_config.tags:
        # A set of tags seen is maintained while creating the tags playlists so
        # that they are ignored when creating the "Other" playlists.
        seen_tags = set()
        tag_playlists = build_tag_playlists(
            config.collection.playlist_config.tags,
            tags_tracks,
            playlist_class,
            seen_tags,
            minimum_tracks=minimum_tag_tracks,
        )

        # The tag playlists must have their "parent" attribute set so that
        # PlaylistFilter implementations may apply logic that depends on the
        # relative position of the playlist with the playlist tree.
        tag_playlists.set_parent()

        # Apply the filtering logic of the configured PlaylistFilter implementations.
        filter_tag_playlists(tag_playlists, filters)

        # Recursively traverse the playlist tree and create "all" playlists
        # within each folder containing more than one playlist. These "all"
        # playlists aggregate the set of tracks contained within all the other
        # playlists within the same folder.
        _ = aggregate_playlists(
            tag_playlists, playlist_class, minimum_tag_tracks
        )

        auto_playlists.extend(tag_playlists)

        # Identify the set of tags that did not appear in the playlist config
        # and create either an "Other" folder of playlists or simply an "Other"
        # playlist.
        other_tags = sorted(set(tags_tracks).difference(seen_tags))
        if (
            config.collection.collection_playlists_remainder
            == PlaylistRemainder.FOLDER
        ):
            auto_playlists.append(
                build_tag_playlists(
                    PlaylistConfigContent(
                        name="Unused Tags", playlists=other_tags
                    ),
                    tags_tracks,
                    playlist_class,
                    minimum_tracks=minimum_tag_tracks,
                )
            )
        else:
            auto_playlists.append(
                build_tag_playlists(
                    "Unused Tags",
                    {
                        "Unused Tags": {
                            track_id: track
                            for tag, track_dict in tags_tracks.items()
                            for track_id, track in track_dict.items()
                            if tag in other_tags
                        }
                    },
                    playlist_class,
                    minimum_tracks=minimum_tag_tracks,
                )
            )

    # Create playlists for the "combiner" portion of the playlist config.
    if config.collection.playlist_config.combiner:
        # Parse selectors from the combiner playlist names and update the
        # tags_tracks mapping.
        add_selectors_to_tags(
            config.collection.playlist_config.combiner,
            tags_tracks,
            collection,
            auto_playlists,
        )

        # Evaluate the boolean logic of the combiner playlists.
        combiner_playlists = build_combiner_playlists(
            config.collection.playlist_config.combiner,
            tags_tracks,
            playlist_class,
            minimum_tracks=minimum_combiner_tracks,
        )

        # The tag playlists must have their "parent" attribute set so that
        # PlaylistFilter implementations may apply logic that depends on the
        # relative position of the playlist with the playlist tree.
        combiner_playlists.set_parent()

        # Apply the filtering logic of the configured PlaylistFilter implementations.
        filter_tag_playlists(combiner_playlists, filters)

        # Recursively traverse the playlist tree and create "all" playlists
        # within each folder containing more than one playlist. These "all"
        # playlists aggregate the set of tracks contained within all the other
        # playlists within the same folder.
        _ = aggregate_playlists(
            combiner_playlists, playlist_class, minimum_combiner_tracks
        )

        auto_playlists.extend(combiner_playlists)

        # Print tag statistics for each combiner playlist.
        if config.verbosity and combiner_playlists:
            print_playlists_tag_statistics(combiner_playlists)

    # Remove any previous playlist builder playlists.
    previous_playlists = collection.get_playlists(name=PLAYLIST_NAME)
    root = collection.get_playlists()
    for playlist in previous_playlists:
        root.remove_playlist(playlist)

    # Insert a new playlist containing the built playlists.
    auto_playlist = playlist_class.new_playlist(
        name=PLAYLIST_NAME, playlists=auto_playlists
    )
    auto_playlist.set_parent(collection.get_playlists())
    collection.add_playlist(auto_playlist)
    collection.serialize(path=path)

    num_playlists = collection.get_playlists().get_number_of_playlists()
    logger.info(f"{PLAYLIST_NAME} generated with {num_playlists} playlists")

This module contains the configuration objects for the collection package. The attributes of this configuration object correspond with the "collection" key of config.yaml

CollectionConfig

Bases: BaseConfigFormatter

Configuration object for the collection package.

Source code in djtools/collection/config.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class CollectionConfig(BaseConfigFormatter):
    """Configuration object for the collection package."""

    collection_path: Optional[Path] = None
    collection_playlist_filters: List[PlaylistFilters] = []
    collection_playlists: bool = False
    collection_playlists_remainder: PlaylistRemainder = (
        PlaylistRemainder.FOLDER
    )
    copy_playlists: List[str] = []
    copy_playlists_destination: Optional[Path] = None
    minimum_combiner_playlist_tracks: Optional[PositiveInt] = None
    minimum_tag_playlist_tracks: Optional[PositiveInt] = None
    platform: RegisteredPlatforms = RegisteredPlatforms.REKORDBOX
    shuffle_playlists: List[str] = []
    playlist_config: Optional["PlaylistConfig"] = None

    def __init__(self, *args, **kwargs):
        """Constructor.

        Raises:
            RuntimeError: Using the collection package requires a valid
                collection_path.
            RuntimeError: Failed to render collection_playlist.yaml from
                template.
            RuntimeError: collection_path must be a valid collection path.
            RuntimeError: collection_playlists.yaml must be a valid YAML file.
        """
        super().__init__(*args, **kwargs)

        if any(
            [
                self.collection_playlists,
                self.copy_playlists,
                self.shuffle_playlists,
            ]
        ) and (not self.collection_path or not self.collection_path.exists()):
            raise RuntimeError(
                "Using the collection package requires the config option "
                "collection_path to be a valid collection path"
            )

        if self.collection_playlists:
            config_path = Path(__file__).parent.parent / "configs"
            env = Environment(
                loader=FileSystemLoader(config_path / "playlist_templates")
            )
            playlist_template = None
            playlist_template_name = "collection_playlists.j2"
            playlist_config_path = config_path / "collection_playlists.yaml"

            try:
                playlist_template = env.get_template(playlist_template_name)
            except TemplateNotFound:
                pass

            if playlist_template:
                try:
                    playlist_config = playlist_template.render()
                except Exception as exc:
                    raise RuntimeError(
                        f"Failed to render {playlist_template_name}: {exc}"
                    ) from exc

                if playlist_config_path.exists():
                    logger.warning(
                        f"Both {playlist_template_name} and "
                        f"{playlist_config_path.name} exist. Overwriting "
                        f"{playlist_config_path.name} with the rendered "
                        "template"
                    )

                with open(
                    playlist_config_path, mode="w", encoding="utf-8"
                ) as _file:
                    _file.write(playlist_config)

            if not playlist_config_path.exists():
                raise RuntimeError(
                    "collection_playlists.yaml must exist to use the "
                    "collection_playlists feature"
                )

            try:
                with open(
                    playlist_config_path, mode="r", encoding="utf-8"
                ) as _file:
                    self.playlist_config = PlaylistConfig(
                        **yaml.load(_file, Loader=yaml.FullLoader) or {}
                    )
            except ValidationError as exc:
                raise RuntimeError(
                    "collection_playlists.yaml must be a valid YAML to use "
                    "the collection_playlists feature"
                ) from exc

__init__(*args, **kwargs)

Constructor.

Raises:

Type Description
RuntimeError

Using the collection package requires a valid collection_path.

RuntimeError

Failed to render collection_playlist.yaml from template.

RuntimeError

collection_path must be a valid collection path.

RuntimeError

collection_playlists.yaml must be a valid YAML file.

Source code in djtools/collection/config.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def __init__(self, *args, **kwargs):
    """Constructor.

    Raises:
        RuntimeError: Using the collection package requires a valid
            collection_path.
        RuntimeError: Failed to render collection_playlist.yaml from
            template.
        RuntimeError: collection_path must be a valid collection path.
        RuntimeError: collection_playlists.yaml must be a valid YAML file.
    """
    super().__init__(*args, **kwargs)

    if any(
        [
            self.collection_playlists,
            self.copy_playlists,
            self.shuffle_playlists,
        ]
    ) and (not self.collection_path or not self.collection_path.exists()):
        raise RuntimeError(
            "Using the collection package requires the config option "
            "collection_path to be a valid collection path"
        )

    if self.collection_playlists:
        config_path = Path(__file__).parent.parent / "configs"
        env = Environment(
            loader=FileSystemLoader(config_path / "playlist_templates")
        )
        playlist_template = None
        playlist_template_name = "collection_playlists.j2"
        playlist_config_path = config_path / "collection_playlists.yaml"

        try:
            playlist_template = env.get_template(playlist_template_name)
        except TemplateNotFound:
            pass

        if playlist_template:
            try:
                playlist_config = playlist_template.render()
            except Exception as exc:
                raise RuntimeError(
                    f"Failed to render {playlist_template_name}: {exc}"
                ) from exc

            if playlist_config_path.exists():
                logger.warning(
                    f"Both {playlist_template_name} and "
                    f"{playlist_config_path.name} exist. Overwriting "
                    f"{playlist_config_path.name} with the rendered "
                    "template"
                )

            with open(
                playlist_config_path, mode="w", encoding="utf-8"
            ) as _file:
                _file.write(playlist_config)

        if not playlist_config_path.exists():
            raise RuntimeError(
                "collection_playlists.yaml must exist to use the "
                "collection_playlists feature"
            )

        try:
            with open(
                playlist_config_path, mode="r", encoding="utf-8"
            ) as _file:
                self.playlist_config = PlaylistConfig(
                    **yaml.load(_file, Loader=yaml.FullLoader) or {}
                )
        except ValidationError as exc:
            raise RuntimeError(
                "collection_playlists.yaml must be a valid YAML to use "
                "the collection_playlists feature"
            ) from exc

PlaylistConfig

Bases: BaseModel

A class for type checking the playlist config YAML.

Source code in djtools/collection/config.py
197
198
199
200
class PlaylistConfig(BaseModel, extra="forbid"):
    "A class for type checking the playlist config YAML."
    combiner: Optional[PlaylistConfigContent] = None
    tags: Optional[PlaylistConfigContent] = None

PlaylistConfigContent

Bases: BaseModel

A class for type checking the content of the playlist config YAML.

Source code in djtools/collection/config.py
190
191
192
193
194
class PlaylistConfigContent(BaseModel, extra="forbid"):
    "A class for type checking the content of the playlist config YAML."
    name: str
    playlists: List[Union["PlaylistConfigContent", PlaylistName, str]]
    enable_aggregation: Optional[bool] = None

PlaylistFilters

Bases: Enum

PlaylistFilters enum.

Source code in djtools/collection/config.py
21
22
23
24
25
26
27
class PlaylistFilters(Enum):
    """PlaylistFilters enum."""

    COMPLEX_TRACK_FILTER = "ComplexTrackFilter"
    HIPHOP_FILTER = "HipHopFilter"
    MINIMAL_DEEP_TECH_FILTER = "MinimalDeepTechFilter"
    TRANSITION_TRACK_FILTER = "TransitionTrackFilter"

PlaylistName

Bases: BaseModel

A class for configuring the names of playlists.

Source code in djtools/collection/config.py
184
185
186
187
class PlaylistName(BaseModel, extra="forbid"):
    "A class for configuring the names of playlists."
    tag_content: str
    name: Optional[str] = None

PlaylistRemainder

Bases: Enum

PlaylistRemainder enum.

Source code in djtools/collection/config.py
46
47
48
49
50
class PlaylistRemainder(Enum):
    """PlaylistRemainder enum."""

    FOLDER = "folder"
    PLAYLIST = "playlist"

RegisteredPlatforms

Bases: Enum

RegisteredPlatforms enum.

Source code in djtools/collection/config.py
67
68
69
70
class RegisteredPlatforms(Enum):
    """RegisteredPlatforms enum."""

    REKORDBOX = "rekordbox"

This module is used to automatically generate a playlist structure.

collection_playlists(config, path=None)

Builds playlists automatically.

By maintaining a collection with tracks having tag data (e.g. genre tags, Rekordbox's "My Tags", etc.) and providing a playlist config which specifies a desired playlist structure based around these tags, users can automatically generate that playlist structure.

The playlist config is a YAML file which specifies a nested structure of folders. Each folder is declared with a "name" and a list of "playlists" which may be either more folder declarations or else strings matching a tag in your collection.

Any folder that has more than one playlist within it will automatically have an "All " playlist added to it that contains the set of tracks from all the other playlists in that folder.

Any tag in your collection that is not specified in the playlist config will automatically be added to either an "Other" folder of playlists or an "Other" playlist (depending on your configured choice for collection_playlists_remainder).

A special folder with the name "_ignore" may be included anywhere within the "tags" specification with playlists matching the set of tags to ignore when creating the "Other" folder / playlist.

In addition to creating playlists from tags, this function also supports creating "combiner" playlists by evaluating boolean algebra expressions. This is an incredibly powerful feature which allows users to apply set operations {union, intersection, and difference} to a diverse range of operands {tag, playlists, BPMs, ratings, etc.}.

Combiner playlists are declared in the "combiner" specification of the playlist config with playlists whose names are the boolean algebra expressions used to construct them.

Here's an example combiner playlist to illustrate this:

((Dubstep ~ [1-3]) | {playlist: My Favorites} | (*Techno & [135-145])) & Dark

The resulting combiner playlist will be comprised of tracks that are:

  • tagged as "Dubstep" but NOT having a rating less than 4
  • OR in the playlist called "My Favorites"
  • OR tagged as something ending with "Techno" AND in the BPM range of 135 to 145
  • AND tagged as "Dark"

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
path Optional[Path]

Path to write the new collection to.

None
Source code in djtools/collection/playlist_builder.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
@make_path
def collection_playlists(config: BaseConfig, path: Optional[Path] = None):
    """Builds playlists automatically.

    By maintaining a collection with tracks having tag data (e.g. genre tags,
    Rekordbox's "My Tags", etc.) and providing a playlist config which
    specifies a desired playlist structure based around these tags, users can
    automatically generate that playlist structure.

    The playlist config is a YAML file which specifies a nested structure of
    folders. Each folder is declared with a "name" and a list of "playlists"
    which may be either more folder declarations or else strings matching a tag
    in your collection.

    Any folder that has more than one playlist within it will automatically
    have an "All <folder name>" playlist added to it that contains the set of
    tracks from all the other playlists in that folder.

    Any tag in your collection that is not specified in the playlist config
    will automatically be added to either an "Other" folder of playlists or an
    "Other" playlist (depending on your configured choice for
    collection_playlists_remainder).

    A special folder with the name "_ignore" may be included anywhere within
    the "tags" specification with playlists matching the set of tags to ignore
    when creating the "Other" folder / playlist.

    In addition to creating playlists from tags, this function also supports
    creating "combiner" playlists by evaluating boolean algebra expressions.
    This is an incredibly powerful feature which allows users to apply set
    operations {union, intersection, and difference} to a diverse range of
    operands {tag, playlists, BPMs, ratings, etc.}.

    Combiner playlists are declared in the "combiner" specification of the
    playlist config with playlists whose names are the boolean algebra
    expressions used to construct them.

    Here's an example combiner playlist to illustrate this:

        ((Dubstep ~ [1-3]) | {playlist: My Favorites} | (*Techno & [135-145])) & Dark

    The resulting combiner playlist will be comprised of tracks that are:

    - tagged as "Dubstep" but NOT having a rating less than 4
    - OR in the playlist called "My Favorites"
    - OR tagged as something ending with "Techno" AND in the BPM range of 135
    to 145
    - AND tagged as "Dark"

    Args:
        config: Configuration object.
        path: Path to write the new collection to.
    """
    # Check if the playlist config is populated before continuing.
    if not (
        config.collection.playlist_config.tags
        or config.collection.playlist_config.combiner
    ):
        logger.warning(
            "Not building playlists because the playlist config is empty."
        )
        return

    # Load the collection.
    collection = PLATFORM_REGISTRY[config.collection.platform]["collection"](
        path=config.collection.collection_path
    )

    # Get the Playlist implementation to use for this collection.
    playlist_class = PLATFORM_REGISTRY[config.collection.platform]["playlist"]

    # Required number of tracks to make tag and combiner playlists.
    minimum_tag_tracks = config.collection.minimum_tag_playlist_tracks
    minimum_combiner_tracks = (
        config.collection.minimum_combiner_playlist_tracks
    )

    # Create a dict of tracks keyed by their individual tags.
    tags_tracks = defaultdict(dict)
    for track_id, track in collection.get_tracks().items():
        for tag in track.get_tags():
            tags_tracks[tag][track_id] = track

    # This will hold the playlists being built.
    auto_playlists = []

    # List of PlaylistFilter implementations to run against built playlists.
    filters = [
        getattr(playlist_filters, playlist_filter.value)()
        for playlist_filter in config.collection.collection_playlist_filters
    ]

    # Create playlists for the "tags" portion of the playlist config.
    if config.collection.playlist_config.tags:
        # A set of tags seen is maintained while creating the tags playlists so
        # that they are ignored when creating the "Other" playlists.
        seen_tags = set()
        tag_playlists = build_tag_playlists(
            config.collection.playlist_config.tags,
            tags_tracks,
            playlist_class,
            seen_tags,
            minimum_tracks=minimum_tag_tracks,
        )

        # The tag playlists must have their "parent" attribute set so that
        # PlaylistFilter implementations may apply logic that depends on the
        # relative position of the playlist with the playlist tree.
        tag_playlists.set_parent()

        # Apply the filtering logic of the configured PlaylistFilter implementations.
        filter_tag_playlists(tag_playlists, filters)

        # Recursively traverse the playlist tree and create "all" playlists
        # within each folder containing more than one playlist. These "all"
        # playlists aggregate the set of tracks contained within all the other
        # playlists within the same folder.
        _ = aggregate_playlists(
            tag_playlists, playlist_class, minimum_tag_tracks
        )

        auto_playlists.extend(tag_playlists)

        # Identify the set of tags that did not appear in the playlist config
        # and create either an "Other" folder of playlists or simply an "Other"
        # playlist.
        other_tags = sorted(set(tags_tracks).difference(seen_tags))
        if (
            config.collection.collection_playlists_remainder
            == PlaylistRemainder.FOLDER
        ):
            auto_playlists.append(
                build_tag_playlists(
                    PlaylistConfigContent(
                        name="Unused Tags", playlists=other_tags
                    ),
                    tags_tracks,
                    playlist_class,
                    minimum_tracks=minimum_tag_tracks,
                )
            )
        else:
            auto_playlists.append(
                build_tag_playlists(
                    "Unused Tags",
                    {
                        "Unused Tags": {
                            track_id: track
                            for tag, track_dict in tags_tracks.items()
                            for track_id, track in track_dict.items()
                            if tag in other_tags
                        }
                    },
                    playlist_class,
                    minimum_tracks=minimum_tag_tracks,
                )
            )

    # Create playlists for the "combiner" portion of the playlist config.
    if config.collection.playlist_config.combiner:
        # Parse selectors from the combiner playlist names and update the
        # tags_tracks mapping.
        add_selectors_to_tags(
            config.collection.playlist_config.combiner,
            tags_tracks,
            collection,
            auto_playlists,
        )

        # Evaluate the boolean logic of the combiner playlists.
        combiner_playlists = build_combiner_playlists(
            config.collection.playlist_config.combiner,
            tags_tracks,
            playlist_class,
            minimum_tracks=minimum_combiner_tracks,
        )

        # The tag playlists must have their "parent" attribute set so that
        # PlaylistFilter implementations may apply logic that depends on the
        # relative position of the playlist with the playlist tree.
        combiner_playlists.set_parent()

        # Apply the filtering logic of the configured PlaylistFilter implementations.
        filter_tag_playlists(combiner_playlists, filters)

        # Recursively traverse the playlist tree and create "all" playlists
        # within each folder containing more than one playlist. These "all"
        # playlists aggregate the set of tracks contained within all the other
        # playlists within the same folder.
        _ = aggregate_playlists(
            combiner_playlists, playlist_class, minimum_combiner_tracks
        )

        auto_playlists.extend(combiner_playlists)

        # Print tag statistics for each combiner playlist.
        if config.verbosity and combiner_playlists:
            print_playlists_tag_statistics(combiner_playlists)

    # Remove any previous playlist builder playlists.
    previous_playlists = collection.get_playlists(name=PLAYLIST_NAME)
    root = collection.get_playlists()
    for playlist in previous_playlists:
        root.remove_playlist(playlist)

    # Insert a new playlist containing the built playlists.
    auto_playlist = playlist_class.new_playlist(
        name=PLAYLIST_NAME, playlists=auto_playlists
    )
    auto_playlist.set_parent(collection.get_playlists())
    collection.add_playlist(auto_playlist)
    collection.serialize(path=path)

    num_playlists = collection.get_playlists().get_number_of_playlists()
    logger.info(f"{PLAYLIST_NAME} generated with {num_playlists} playlists")

This module contains the PlaylistFilter abstract base class and its implementations.

PlaylistFilter subclasses implement an 'is_filter_playlist' method and a 'filter_track' method.

The 'is_filter_playlist' method, when given a 'Playlist', returns true if that 'Playlist' should have its tracks filtered.

The 'filter_track' method, when given a 'Track', returns true if that 'Track' should remain in the playlist.

ComplexTrackFilter

Bases: PlaylistFilter

This class filters "complex" playlists.

This PlaylistFilter looks for playlists with "complex" in their name or in the name of a parent playlist. When found, tracks contained in the playlist must have no less than 'min_tags_for_complex_track' in order to remain in the playlist.

Source code in djtools/collection/playlist_filters.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class ComplexTrackFilter(PlaylistFilter):
    """This class filters "complex" playlists.

    This PlaylistFilter looks for playlists with "complex" in their name or in
    the name of a parent playlist. When found, tracks contained in the playlist
    must have no less than 'min_tags_for_complex_track' in order to remain in
    the playlist.
    """

    def __init__(
        self,
        min_tags_for_complex_track: Optional[int] = 3,
        exclude_tags: Optional[List[str]] = None,
    ):
        """Constructor.

        Args:
            min_tags_for_complex_track: Maximum number of non-genre tags before
                a track is no longer considered "complex".
            exclude_tags: Tags to ignore when determining the number of
                non-genre tags.
        """
        super().__init__()
        self._min_tags_for_complex_track = min_tags_for_complex_track
        if exclude_tags is None:
            exclude_tags = [
                "DELETE",
                "Flute",
                "Guitar",
                "Horn",
                "Piano",
                "Scratch",
                "Strings",
                "Vocal",
            ]
        self._exclude_tags = set(exclude_tags)

    def filter_track(self, track: Track) -> bool:
        """Returns True if this track should remain in the playlist.

        Args:
            track: Track object to apply filter to.

        Returns:
            Whether or not this track should be included in the playlist.
        """
        other_tags = (
            set(track.get_tags())
            .difference(set(track.get_genre_tags()))
            .difference(self._exclude_tags)
        )

        return (
            other_tags and len(other_tags) >= self._min_tags_for_complex_track
        )

    def is_filter_playlist(self, playlist: Playlist) -> bool:
        """Returns True if this playlist should be filtered.

        Args:
            playlist: Playlist object to potentially filter.

        Returns:
            Whether or not to filter this playlist.
        """
        playlist_exp = re.compile(r".*complex.*")
        if re.search(playlist_exp, playlist.get_name().lower()):
            return True

        parent = playlist.get_parent()
        while parent:
            if re.search(playlist_exp, parent.get_name().lower()):
                return True
            parent = parent.get_parent()

        return False

__init__(min_tags_for_complex_track=3, exclude_tags=None)

Constructor.

Parameters:

Name Type Description Default
min_tags_for_complex_track Optional[int]

Maximum number of non-genre tags before a track is no longer considered "complex".

3
exclude_tags Optional[List[str]]

Tags to ignore when determining the number of non-genre tags.

None
Source code in djtools/collection/playlist_filters.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def __init__(
    self,
    min_tags_for_complex_track: Optional[int] = 3,
    exclude_tags: Optional[List[str]] = None,
):
    """Constructor.

    Args:
        min_tags_for_complex_track: Maximum number of non-genre tags before
            a track is no longer considered "complex".
        exclude_tags: Tags to ignore when determining the number of
            non-genre tags.
    """
    super().__init__()
    self._min_tags_for_complex_track = min_tags_for_complex_track
    if exclude_tags is None:
        exclude_tags = [
            "DELETE",
            "Flute",
            "Guitar",
            "Horn",
            "Piano",
            "Scratch",
            "Strings",
            "Vocal",
        ]
    self._exclude_tags = set(exclude_tags)

filter_track(track)

Returns True if this track should remain in the playlist.

Parameters:

Name Type Description Default
track Track

Track object to apply filter to.

required

Returns:

Type Description
bool

Whether or not this track should be included in the playlist.

Source code in djtools/collection/playlist_filters.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def filter_track(self, track: Track) -> bool:
    """Returns True if this track should remain in the playlist.

    Args:
        track: Track object to apply filter to.

    Returns:
        Whether or not this track should be included in the playlist.
    """
    other_tags = (
        set(track.get_tags())
        .difference(set(track.get_genre_tags()))
        .difference(self._exclude_tags)
    )

    return (
        other_tags and len(other_tags) >= self._min_tags_for_complex_track
    )

is_filter_playlist(playlist)

Returns True if this playlist should be filtered.

Parameters:

Name Type Description Default
playlist Playlist

Playlist object to potentially filter.

required

Returns:

Type Description
bool

Whether or not to filter this playlist.

Source code in djtools/collection/playlist_filters.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def is_filter_playlist(self, playlist: Playlist) -> bool:
    """Returns True if this playlist should be filtered.

    Args:
        playlist: Playlist object to potentially filter.

    Returns:
        Whether or not to filter this playlist.
    """
    playlist_exp = re.compile(r".*complex.*")
    if re.search(playlist_exp, playlist.get_name().lower()):
        return True

    parent = playlist.get_parent()
    while parent:
        if re.search(playlist_exp, parent.get_name().lower()):
            return True
        parent = parent.get_parent()

    return False

HipHopFilter

Bases: PlaylistFilter

This class filters playlists called "Hip Hop".

Source code in djtools/collection/playlist_filters.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class HipHopFilter(PlaylistFilter):
    'This class filters playlists called "Hip Hop".'

    def filter_track(self, track: Track) -> bool:
        """Returns True if this track should remain in the playlist.

        If the playlist is not underneath a folder called "Bass", then this
        track is filtered out unless it has exclusively "Hip Hop" and "R&B"
        genre tags. If the playlist is underneath a folder called "Bass", then
        this track is filtered out if it does have exclusively "Hip Hop" and
        "R&B" genre tags.

        Args:
            track: Track object to apply filter to.

        Returns:
            Whether or not this track should be included in the playlist.
        """
        pure_hip_hop_with_other_tags = not self._bass_hip_hop and any(
            "r&b" not in x.lower() and "hip hop" not in x.lower()
            for x in track.get_genre_tags()
        )
        bass_hip_hop_without_other_tags = self._bass_hip_hop and all(
            "r&b" in x.lower() or "hip hop" in x.lower()
            for x in track.get_genre_tags()
        )
        if pure_hip_hop_with_other_tags or bass_hip_hop_without_other_tags:
            return False

        return True

    def is_filter_playlist(self, playlist: Playlist) -> bool:
        """Returns True if this playlist's name is "Hip Hop".

        Args:
            playlist: Playlist object to potentially filter.

        Returns:
            Whether or not to filter this playlist.
        """
        self._bass_hip_hop = (  # pylint: disable=attribute-defined-outside-init
            False
        )

        if not playlist.get_name() == "Hip Hop":
            return False

        parent = playlist.get_parent()
        while parent:
            if parent.get_name() == "Bass":
                self._bass_hip_hop = (  # pylint: disable=attribute-defined-outside-init
                    True
                )
                break
            parent = parent.get_parent()

        return True

filter_track(track)

Returns True if this track should remain in the playlist.

If the playlist is not underneath a folder called "Bass", then this track is filtered out unless it has exclusively "Hip Hop" and "R&B" genre tags. If the playlist is underneath a folder called "Bass", then this track is filtered out if it does have exclusively "Hip Hop" and "R&B" genre tags.

Parameters:

Name Type Description Default
track Track

Track object to apply filter to.

required

Returns:

Type Description
bool

Whether or not this track should be included in the playlist.

Source code in djtools/collection/playlist_filters.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def filter_track(self, track: Track) -> bool:
    """Returns True if this track should remain in the playlist.

    If the playlist is not underneath a folder called "Bass", then this
    track is filtered out unless it has exclusively "Hip Hop" and "R&B"
    genre tags. If the playlist is underneath a folder called "Bass", then
    this track is filtered out if it does have exclusively "Hip Hop" and
    "R&B" genre tags.

    Args:
        track: Track object to apply filter to.

    Returns:
        Whether or not this track should be included in the playlist.
    """
    pure_hip_hop_with_other_tags = not self._bass_hip_hop and any(
        "r&b" not in x.lower() and "hip hop" not in x.lower()
        for x in track.get_genre_tags()
    )
    bass_hip_hop_without_other_tags = self._bass_hip_hop and all(
        "r&b" in x.lower() or "hip hop" in x.lower()
        for x in track.get_genre_tags()
    )
    if pure_hip_hop_with_other_tags or bass_hip_hop_without_other_tags:
        return False

    return True

is_filter_playlist(playlist)

Returns True if this playlist's name is "Hip Hop".

Parameters:

Name Type Description Default
playlist Playlist

Playlist object to potentially filter.

required

Returns:

Type Description
bool

Whether or not to filter this playlist.

Source code in djtools/collection/playlist_filters.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def is_filter_playlist(self, playlist: Playlist) -> bool:
    """Returns True if this playlist's name is "Hip Hop".

    Args:
        playlist: Playlist object to potentially filter.

    Returns:
        Whether or not to filter this playlist.
    """
    self._bass_hip_hop = (  # pylint: disable=attribute-defined-outside-init
        False
    )

    if not playlist.get_name() == "Hip Hop":
        return False

    parent = playlist.get_parent()
    while parent:
        if parent.get_name() == "Bass":
            self._bass_hip_hop = (  # pylint: disable=attribute-defined-outside-init
                True
            )
            break
        parent = parent.get_parent()

    return True

MinimalDeepTechFilter

Bases: PlaylistFilter

This class filters playlists called "Minimal Deep Tech".

Source code in djtools/collection/playlist_filters.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class MinimalDeepTechFilter(PlaylistFilter):
    'This class filters playlists called "Minimal Deep Tech".'

    def filter_track(self, track: Track) -> bool:
        """Returns True if this track should remain in the playlist.

        If the playlist is not underneath a folder called "Techno", then this
        track is filtered out if there's another genre tag containing "Techno".
        If the playlist is underneath a folder called "Techno", then
        this track is filtered out if there's no other genre tag containing
        "Techno".

        Args:
            track: Track object to apply filter to.

        Returns:
            Whether or not this track should be included in the playlist.
        """
        house_exp = re.compile(r".*house.*")
        techno_exp = re.compile(r".*techno.*")
        house_tag = techno_tag = False
        for tag in track.get_genre_tags():
            if re.search(house_exp, tag.lower()):
                house_tag = True
            if re.search(techno_exp, tag.lower()):
                techno_tag = True
        if (self._techno and not techno_tag) or (
            self._house and not house_tag
        ):
            return False

        return True

    def is_filter_playlist(self, playlist: Playlist) -> bool:
        """Returns True if this playlist's name is "Minimal Deep Tech".

        Args:
            playlist: Playlist object to potentially filter.

        Returns:
            Whether or not to filter this playlist.
        """
        self._techno = False  # pylint: disable=attribute-defined-outside-init
        self._house = False  # pylint: disable=attribute-defined-outside-init

        if not playlist.get_name() == "Minimal Deep Tech":
            return False

        parent = playlist.get_parent()
        while parent:
            if parent.get_name() == "Techno":
                self._techno = (  # pylint: disable=attribute-defined-outside-init
                    True
                )
            if parent.get_name() == "House":
                self._house = (  # pylint: disable=attribute-defined-outside-init
                    True
                )
            parent = parent.get_parent()

        return self._techno or self._house

filter_track(track)

Returns True if this track should remain in the playlist.

If the playlist is not underneath a folder called "Techno", then this track is filtered out if there's another genre tag containing "Techno". If the playlist is underneath a folder called "Techno", then this track is filtered out if there's no other genre tag containing "Techno".

Parameters:

Name Type Description Default
track Track

Track object to apply filter to.

required

Returns:

Type Description
bool

Whether or not this track should be included in the playlist.

Source code in djtools/collection/playlist_filters.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def filter_track(self, track: Track) -> bool:
    """Returns True if this track should remain in the playlist.

    If the playlist is not underneath a folder called "Techno", then this
    track is filtered out if there's another genre tag containing "Techno".
    If the playlist is underneath a folder called "Techno", then
    this track is filtered out if there's no other genre tag containing
    "Techno".

    Args:
        track: Track object to apply filter to.

    Returns:
        Whether or not this track should be included in the playlist.
    """
    house_exp = re.compile(r".*house.*")
    techno_exp = re.compile(r".*techno.*")
    house_tag = techno_tag = False
    for tag in track.get_genre_tags():
        if re.search(house_exp, tag.lower()):
            house_tag = True
        if re.search(techno_exp, tag.lower()):
            techno_tag = True
    if (self._techno and not techno_tag) or (
        self._house and not house_tag
    ):
        return False

    return True

is_filter_playlist(playlist)

Returns True if this playlist's name is "Minimal Deep Tech".

Parameters:

Name Type Description Default
playlist Playlist

Playlist object to potentially filter.

required

Returns:

Type Description
bool

Whether or not to filter this playlist.

Source code in djtools/collection/playlist_filters.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def is_filter_playlist(self, playlist: Playlist) -> bool:
    """Returns True if this playlist's name is "Minimal Deep Tech".

    Args:
        playlist: Playlist object to potentially filter.

    Returns:
        Whether or not to filter this playlist.
    """
    self._techno = False  # pylint: disable=attribute-defined-outside-init
    self._house = False  # pylint: disable=attribute-defined-outside-init

    if not playlist.get_name() == "Minimal Deep Tech":
        return False

    parent = playlist.get_parent()
    while parent:
        if parent.get_name() == "Techno":
            self._techno = (  # pylint: disable=attribute-defined-outside-init
                True
            )
        if parent.get_name() == "House":
            self._house = (  # pylint: disable=attribute-defined-outside-init
                True
            )
        parent = parent.get_parent()

    return self._techno or self._house

PlaylistFilter

Bases: ABC

This class defines an interface for filtering tracks from playlists.

Source code in djtools/collection/playlist_filters.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class PlaylistFilter(ABC):
    "This class defines an interface for filtering tracks from playlists."

    @abstractmethod
    def filter_track(self, track: Track) -> bool:
        """Returns True if this track should remain in the playlist.

        Args:
            track: Track object to apply filter to.

        Returns:
            Whether or not this track should be included in the playlist.
        """

    @abstractmethod
    def is_filter_playlist(self, playlist: Playlist) -> bool:
        """Returns True if this playlist should be filtered.

        Args:
            playlist: Playlist object to potentially filter.

        Returns:
            Whether or not to filter this playlist.
        """

filter_track(track) abstractmethod

Returns True if this track should remain in the playlist.

Parameters:

Name Type Description Default
track Track

Track object to apply filter to.

required

Returns:

Type Description
bool

Whether or not this track should be included in the playlist.

Source code in djtools/collection/playlist_filters.py
25
26
27
28
29
30
31
32
33
34
@abstractmethod
def filter_track(self, track: Track) -> bool:
    """Returns True if this track should remain in the playlist.

    Args:
        track: Track object to apply filter to.

    Returns:
        Whether or not this track should be included in the playlist.
    """

is_filter_playlist(playlist) abstractmethod

Returns True if this playlist should be filtered.

Parameters:

Name Type Description Default
playlist Playlist

Playlist object to potentially filter.

required

Returns:

Type Description
bool

Whether or not to filter this playlist.

Source code in djtools/collection/playlist_filters.py
36
37
38
39
40
41
42
43
44
45
@abstractmethod
def is_filter_playlist(self, playlist: Playlist) -> bool:
    """Returns True if this playlist should be filtered.

    Args:
        playlist: Playlist object to potentially filter.

    Returns:
        Whether or not to filter this playlist.
    """

TransitionTrackFilter

Bases: PlaylistFilter

This class filters "transition" playlists.

This PlaylistFilter looks for playlists with "transition" in their name or in the name of a parent playlist. When found, tracks contained in the playlist must have a square bracket enclosed set of transition tokens (forward-slash delimited list of floats, for BPMs, or otherwise, for genres).

Source code in djtools/collection/playlist_filters.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class TransitionTrackFilter(PlaylistFilter):
    """This class filters "transition" playlists.

    This PlaylistFilter looks for playlists with "transition" in their name or in
    the name of a parent playlist. When found, tracks contained in the playlist
    must have a square bracket enclosed set of transition tokens (forward-slash
    delimited list of floats, for BPMs, or otherwise, for genres).
    """

    def __init__(self, separator: Optional[str] = "/"):
        """Constructor.

        Args:
            separator: Character used to separate transition tokens.
        """
        super().__init__()
        self._separator = separator
        self._playlist_type = None

    def filter_track(self, track: Track) -> bool:
        """Returns True if this track should remain in the playlist.

        Matches square bracket enclosed tokens representing transitions for
        supported playlist types.

        Args:
            track: Track object to apply filter to.

        Returns:
            Whether or not this track should be included in the playlist.
        """
        comments = track.get_comments()
        transition_exp = re.compile(r"\[([^]]+)\]")
        transition_tokens_match_playlist_type = False
        for match in re.findall(transition_exp, comments):
            try:
                _ = [
                    float(token.strip())
                    for token in match.split(self._separator)
                ]
                if self._playlist_type == "tempo":
                    transition_tokens_match_playlist_type = True
            except ValueError:
                if self._playlist_type == "genre":
                    transition_tokens_match_playlist_type = True

        return transition_tokens_match_playlist_type

    def is_filter_playlist(self, playlist: Playlist) -> bool:
        """Returns True if this playlist should be filtered.

        Identifies playlists with a supported transition playlist type in its
        name while also having a parent playlist with "transition" in its name.

        Args:
            playlist: Playlist object to potentially filter.

        Returns:
            Whether or not to filter this playlist.
        """
        is_transition_playlist = False

        # Check if the given playlist has a substring of "transition".
        playlist_exp = re.compile(r".*transition.*")
        if re.search(playlist_exp, playlist.get_name().lower()):
            is_transition_playlist = True

        # Search parents' names for "transition" substring.
        parent = playlist.get_parent()
        while not is_transition_playlist and parent:
            if re.search(playlist_exp, parent.get_name().lower()):
                is_transition_playlist = True
            parent = parent.get_parent()

        if not is_transition_playlist:
            return False

        # Check if the given playlist contains one, and only one, of the
        # supported transition playlist types.
        self._playlist_type = None
        playlist_type_exprs = {
            "genre": re.compile(r".*genre.*"),
            "tempo": re.compile(r".*tempo.*"),
        }
        for playlist_type, exp in playlist_type_exprs.items():
            if not re.search(exp, playlist.get_name().lower()):
                continue
            if self._playlist_type:
                raise ValueError(
                    f'"{playlist.get_name()}" matches multiple playlist types:'
                    f" {self._playlist_type}, {playlist_type}"
                )
            self._playlist_type = playlist_type

        return bool(self._playlist_type)

__init__(separator='/')

Constructor.

Parameters:

Name Type Description Default
separator Optional[str]

Character used to separate transition tokens.

'/'
Source code in djtools/collection/playlist_filters.py
257
258
259
260
261
262
263
264
265
def __init__(self, separator: Optional[str] = "/"):
    """Constructor.

    Args:
        separator: Character used to separate transition tokens.
    """
    super().__init__()
    self._separator = separator
    self._playlist_type = None

filter_track(track)

Returns True if this track should remain in the playlist.

Matches square bracket enclosed tokens representing transitions for supported playlist types.

Parameters:

Name Type Description Default
track Track

Track object to apply filter to.

required

Returns:

Type Description
bool

Whether or not this track should be included in the playlist.

Source code in djtools/collection/playlist_filters.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def filter_track(self, track: Track) -> bool:
    """Returns True if this track should remain in the playlist.

    Matches square bracket enclosed tokens representing transitions for
    supported playlist types.

    Args:
        track: Track object to apply filter to.

    Returns:
        Whether or not this track should be included in the playlist.
    """
    comments = track.get_comments()
    transition_exp = re.compile(r"\[([^]]+)\]")
    transition_tokens_match_playlist_type = False
    for match in re.findall(transition_exp, comments):
        try:
            _ = [
                float(token.strip())
                for token in match.split(self._separator)
            ]
            if self._playlist_type == "tempo":
                transition_tokens_match_playlist_type = True
        except ValueError:
            if self._playlist_type == "genre":
                transition_tokens_match_playlist_type = True

    return transition_tokens_match_playlist_type

is_filter_playlist(playlist)

Returns True if this playlist should be filtered.

Identifies playlists with a supported transition playlist type in its name while also having a parent playlist with "transition" in its name.

Parameters:

Name Type Description Default
playlist Playlist

Playlist object to potentially filter.

required

Returns:

Type Description
bool

Whether or not to filter this playlist.

Source code in djtools/collection/playlist_filters.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def is_filter_playlist(self, playlist: Playlist) -> bool:
    """Returns True if this playlist should be filtered.

    Identifies playlists with a supported transition playlist type in its
    name while also having a parent playlist with "transition" in its name.

    Args:
        playlist: Playlist object to potentially filter.

    Returns:
        Whether or not to filter this playlist.
    """
    is_transition_playlist = False

    # Check if the given playlist has a substring of "transition".
    playlist_exp = re.compile(r".*transition.*")
    if re.search(playlist_exp, playlist.get_name().lower()):
        is_transition_playlist = True

    # Search parents' names for "transition" substring.
    parent = playlist.get_parent()
    while not is_transition_playlist and parent:
        if re.search(playlist_exp, parent.get_name().lower()):
            is_transition_playlist = True
        parent = parent.get_parent()

    if not is_transition_playlist:
        return False

    # Check if the given playlist contains one, and only one, of the
    # supported transition playlist types.
    self._playlist_type = None
    playlist_type_exprs = {
        "genre": re.compile(r".*genre.*"),
        "tempo": re.compile(r".*tempo.*"),
    }
    for playlist_type, exp in playlist_type_exprs.items():
        if not re.search(exp, playlist.get_name().lower()):
            continue
        if self._playlist_type:
            raise ValueError(
                f'"{playlist.get_name()}" matches multiple playlist types:'
                f" {self._playlist_type}, {playlist_type}"
            )
        self._playlist_type = playlist_type

    return bool(self._playlist_type)

This module is used to emulate shuffling the track order of one or more playlists. This is done by setting the track number attribute of each track in sequential order after collecting the set of Tracks from the provided playlist(s).

shuffle_playlists(config, path=None)

For each playlist in "shuffle_playlists", randomize the tracks and sequentially set the track number to emulate shuffling.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
path Optional[Path]

Path to write the new collection to.

None
Source code in djtools/collection/shuffle_playlists.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@make_path
def shuffle_playlists(config: BaseConfig, path: Optional[Path] = None):
    """For each playlist in "shuffle_playlists", randomize the tracks and
    sequentially set the track number to emulate shuffling.

    Args:
        config: Configuration object.
        path: Path to write the new collection to.
    """
    # Load collection.
    collection = PLATFORM_REGISTRY[config.collection.platform]["collection"](
        path=config.collection.collection_path
    )

    # Build a dict of tracks to shuffle from the provided list of playlists.
    shuffled_tracks = {}
    for playlist_name in config.collection.shuffle_playlists:
        playlists = collection.get_playlists(playlist_name)
        if not playlists:
            raise LookupError(f"{playlist_name} not found")
        for playlist in playlists:
            tracks = playlist.get_tracks()
            track_keys = list(tracks.keys())
            random.shuffle(track_keys)
            shuffled_tracks.update({key: tracks[key] for key in track_keys})

    # Apply the shuffled track number to the attribute of the tracks.
    shuffled_tracks = list(shuffled_tracks.values())
    payload = [shuffled_tracks, list(range(1, len(shuffled_tracks) + 1))]
    with ThreadPoolExecutor(
        max_workers=os.cpu_count() * 4  # pylint: disable=no-member
    ) as executor:
        futures = [
            executor.submit(track.set_track_number, number)
            for track, number in zip(*payload)
        ]
        for future in tqdm(
            as_completed(futures),
            total=len(futures),
            desc=f"Randomizing {len(futures)} tracks",
        ):
            _ = future.result()

    # Insert a new playlist containing just the shuffled tracks.
    collection.add_playlist(
        PLATFORM_REGISTRY[config.collection.platform]["playlist"].new_playlist(
            name="SHUFFLE",
            tracks={track.get_id(): track for track in shuffled_tracks},
        )
    )
    _ = collection.serialize(path=path)

This module is used to copy the audio files from the provided playlists to a new location and serialize a new collection with those tracks pointing to these new locations.

The purpose of this utility is to:

  • backup subsets of your library
  • ensure you have easy access to a preparation independent of the setup

copy_playlists(config, path=None)

Copies tracks from provided playlists to a destination.

Serializes the collection with these playlists and updated locations.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
path Optional[Path]

Path to write the new collection to.

None

Raises:

Type Description
LookupError

Playlist names in copy_playlists must exist in "collection_path".

Source code in djtools/collection/copy_playlists.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@make_path
def copy_playlists(config: BaseConfig, path: Optional[Path] = None):
    """Copies tracks from provided playlists to a destination.

    Serializes the collection with these playlists and updated locations.

    Args:
        config: Configuration object.
        path: Path to write the new collection to.

    Raises:
        LookupError: Playlist names in copy_playlists must exist in
            "collection_path".
    """
    # Load collection.
    collection = PLATFORM_REGISTRY[config.collection.platform]["collection"](
        path=config.collection.collection_path
    )

    # Create destination directory.
    config.collection.copy_playlists_destination.mkdir(
        parents=True, exist_ok=True
    )

    playlist_tracks = {}
    lineage = defaultdict(set)
    playlists = []

    # Get the playlists from the collection.
    for playlist_name in config.collection.copy_playlists:
        found_playlists = collection.get_playlists(playlist_name)
        if not found_playlists:
            raise LookupError(f"{playlist_name} not found")
        playlists.extend(
            [
                playlist
                for playlist in found_playlists
                if not playlist.is_folder()
            ]
        )

    # Traverse the playlist to get tracks for the desired playlists and mark
    # the rest for removal.
    for playlist in playlists:
        playlist_tracks.update(playlist.get_tracks())
        parent = playlist.get_parent()
        while parent:
            lineage[parent] = set()
            for child in list(parent):
                if child not in playlists and child not in lineage:
                    lineage[parent].add(child)
                    continue
            parent = parent.get_parent()
    collection.set_tracks(playlist_tracks)

    # Remove the extra playlists.
    for parent, children in lineage.items():
        for child in children:
            parent.remove_playlist(child)

    # Copy tracks to the destination and update their location.
    payload = zip(
        playlist_tracks.values(),
        [config.collection.copy_playlists_destination] * len(playlist_tracks),
    )

    with ThreadPoolExecutor(
        max_workers=os.cpu_count() * 4  # pylint: disable=no-member
    ) as executor:
        futures = [executor.submit(copy_file, *args) for args in payload]

        with tqdm(total=len(futures), desc="Copying tracks") as pbar:
            for future in as_completed(futures):
                _ = future.result()
                pbar.update(1)

    # Unless specified, write the output collection to the same directory that
    # the files are being copied to.
    if not path:
        path = (
            config.collection.copy_playlists_destination
            / f"copied_playlists_collection{config.collection.collection_path.suffix}"
        )

    # Serialize the new collection.
    _ = collection.serialize(path=path)

This module contains helpers for the collection package.

BooleanNode

Node that contains boolean logic for a sub-expression.

Source code in djtools/collection/helpers.py
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
class BooleanNode:
    """Node that contains boolean logic for a sub-expression."""

    def __init__(
        self,
        tags_tracks: Dict[str, Dict[str, Track]],
        parent: Optional["BooleanNode"] = None,
    ):
        """Constructor.

        Args:
            tags_tracks: Dict of tags to tracks.
            parent: BooleanNode of which this node is a sub-expression.
        """
        self._ops = {
            "&": set.intersection,
            "|": set.union,
            "~": set.difference,
        }
        self._parent = parent
        self._operators = []
        self._operands = []
        self._tags_tracks = tags_tracks

    def _get_tracks(self, tag: str) -> Set[str]:
        """Gets set of track IDs for the provided tag.

        If the tag contains a wildcard, denoted with "*", then the union of
        track IDs with a tag containing the provided tag as a sub-string is
        returned.

        Args:
            tag: Tag for indexing tracks.

        Returns:
            Set of track IDs for the provided tag.
        """
        if "*" in tag and not (
            re.search(NUMERICAL_SELECTOR_REGEX, tag)
            or re.search(STRING_SELECTOR_REGEX, tag)
        ):
            exp = re.compile(r".*".join(tag.split("*")) + "$")
            tracks = {}
            for key in self._tags_tracks:
                if re.match(exp, key):
                    tracks.update(self._tags_tracks[key])
            return tracks

        return self._tags_tracks.get(tag, {})

    def add_operand(self, operand: str) -> str:
        """Add operand to BooleanNode.

        Args:
            operand: Tag or track set to be evaluated.

        Returns:
            Empty string to reset tag in the parse_expression function.
        """
        if isinstance(operand, str):
            operand = operand.strip()
            if not operand:
                return ""
        self._operands.append(operand)

        return ""

    def add_operator(self, operator: str):
        """Adds a set operation to the BooleanNode.

        Args:
            operator: Character representing a set operation.
        """
        self._operators.append(self._ops[operator])

    def evaluate(self) -> Dict[str, Track]:
        """Applies operators to the operands to produce a dict of tracks.

        Raises:
            RuntimeError: The boolean expression is malformed. It must contain
                one less operator than there are operands.

        Returns:
            A dict of tracks reduced from the boolean expression.
        """
        if len(self._operators) + 1 != len(self._operands):
            operands = [
                x if isinstance(x, str) else str(len(x)) + " tracks"
                for x in self._operands
            ]
            raise RuntimeError(
                "Invalid boolean expression:\n"
                f"\toperands: {operands}\n"
                f"\toperators: {[x.__name__ for x in self._operators]}"
            )
        while self._operators:
            tracks_a = (
                self._operands.pop(0)
                if not isinstance(self._operands[0], str)
                else self._get_tracks(tag=self._operands.pop(0))
            )
            tracks_b = (
                self._operands.pop(0)
                if not isinstance(self._operands[0], str)
                else self._get_tracks(tag=self._operands.pop(0))
            )
            operator = self._operators.pop(0)
            track_ids = operator(set(tracks_a), set(tracks_b))
            tracks = {
                track_id: track
                for track_id, track in {**tracks_a, **tracks_b}.items()
                if track_id in track_ids
            }
            self._operands.insert(0, tracks)

        return next(iter(self._operands), set())

    def get_parent(self) -> "BooleanNode":
        """Gets the parent of the BooleanNode.

        Returns:
            Parent BooleanNode.
        """
        return self._parent

    def is_operator(self, char: str) -> bool:
        """Checks if a character is one that represents a set operation.

        Args:
            char: Character that may represent a set operation.

        Returns:
            Whether or not the character is an operator.
        """
        return char in self._ops

__init__(tags_tracks, parent=None)

Constructor.

Parameters:

Name Type Description Default
tags_tracks Dict[str, Dict[str, Track]]

Dict of tags to tracks.

required
parent Optional[BooleanNode]

BooleanNode of which this node is a sub-expression.

None
Source code in djtools/collection/helpers.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
def __init__(
    self,
    tags_tracks: Dict[str, Dict[str, Track]],
    parent: Optional["BooleanNode"] = None,
):
    """Constructor.

    Args:
        tags_tracks: Dict of tags to tracks.
        parent: BooleanNode of which this node is a sub-expression.
    """
    self._ops = {
        "&": set.intersection,
        "|": set.union,
        "~": set.difference,
    }
    self._parent = parent
    self._operators = []
    self._operands = []
    self._tags_tracks = tags_tracks

add_operand(operand)

Add operand to BooleanNode.

Parameters:

Name Type Description Default
operand str

Tag or track set to be evaluated.

required

Returns:

Type Description
str

Empty string to reset tag in the parse_expression function.

Source code in djtools/collection/helpers.py
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def add_operand(self, operand: str) -> str:
    """Add operand to BooleanNode.

    Args:
        operand: Tag or track set to be evaluated.

    Returns:
        Empty string to reset tag in the parse_expression function.
    """
    if isinstance(operand, str):
        operand = operand.strip()
        if not operand:
            return ""
    self._operands.append(operand)

    return ""

add_operator(operator)

Adds a set operation to the BooleanNode.

Parameters:

Name Type Description Default
operator str

Character representing a set operation.

required
Source code in djtools/collection/helpers.py
775
776
777
778
779
780
781
def add_operator(self, operator: str):
    """Adds a set operation to the BooleanNode.

    Args:
        operator: Character representing a set operation.
    """
    self._operators.append(self._ops[operator])

evaluate()

Applies operators to the operands to produce a dict of tracks.

Raises:

Type Description
RuntimeError

The boolean expression is malformed. It must contain one less operator than there are operands.

Returns:

Type Description
Dict[str, Track]

A dict of tracks reduced from the boolean expression.

Source code in djtools/collection/helpers.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
def evaluate(self) -> Dict[str, Track]:
    """Applies operators to the operands to produce a dict of tracks.

    Raises:
        RuntimeError: The boolean expression is malformed. It must contain
            one less operator than there are operands.

    Returns:
        A dict of tracks reduced from the boolean expression.
    """
    if len(self._operators) + 1 != len(self._operands):
        operands = [
            x if isinstance(x, str) else str(len(x)) + " tracks"
            for x in self._operands
        ]
        raise RuntimeError(
            "Invalid boolean expression:\n"
            f"\toperands: {operands}\n"
            f"\toperators: {[x.__name__ for x in self._operators]}"
        )
    while self._operators:
        tracks_a = (
            self._operands.pop(0)
            if not isinstance(self._operands[0], str)
            else self._get_tracks(tag=self._operands.pop(0))
        )
        tracks_b = (
            self._operands.pop(0)
            if not isinstance(self._operands[0], str)
            else self._get_tracks(tag=self._operands.pop(0))
        )
        operator = self._operators.pop(0)
        track_ids = operator(set(tracks_a), set(tracks_b))
        tracks = {
            track_id: track
            for track_id, track in {**tracks_a, **tracks_b}.items()
            if track_id in track_ids
        }
        self._operands.insert(0, tracks)

    return next(iter(self._operands), set())

get_parent()

Gets the parent of the BooleanNode.

Returns:

Type Description
BooleanNode

Parent BooleanNode.

Source code in djtools/collection/helpers.py
825
826
827
828
829
830
831
def get_parent(self) -> "BooleanNode":
    """Gets the parent of the BooleanNode.

    Returns:
        Parent BooleanNode.
    """
    return self._parent

is_operator(char)

Checks if a character is one that represents a set operation.

Parameters:

Name Type Description Default
char str

Character that may represent a set operation.

required

Returns:

Type Description
bool

Whether or not the character is an operator.

Source code in djtools/collection/helpers.py
833
834
835
836
837
838
839
840
841
842
def is_operator(self, char: str) -> bool:
    """Checks if a character is one that represents a set operation.

    Args:
        char: Character that may represent a set operation.

    Returns:
        Whether or not the character is an operator.
    """
    return char in self._ops

add_selectors_to_tags(content, tags_tracks, collection, auto_playlists)

Recursively update the track lookup with selectors.

Parameters:

Name Type Description Default
content Union[PlaylistConfigContent, PlaylistName, str]

A component of a playlist config to create a playlist for.

required
tags_tracks Dict[str, Dict[str, Track]]

Dict of tags to tracks.

required
collection Collection

Collection object.

required
auto_playlists List[Playlist]

Tag playlists built in this same run.

required
Source code in djtools/collection/helpers.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def add_selectors_to_tags(
    content: Union[PlaylistConfigContent, PlaylistName, str],
    tags_tracks: Dict[str, Dict[str, Track]],
    collection: Collection,
    auto_playlists: List[Playlist],
):
    """Recursively update the track lookup with selectors.

    Args:
        content: A component of a playlist config to create a playlist for.
        tags_tracks: Dict of tags to tracks.
        collection: Collection object.
        auto_playlists: Tag playlists built in this same run.
    """
    # This is a folder so parse selectors from playlists within it.
    if isinstance(content, PlaylistConfigContent):
        for playlist in content.playlists:
            add_selectors_to_tags(
                playlist, tags_tracks, collection, auto_playlists
            )
        return

    # This is not a folder so these playlists must have their selectors parsed.
    if isinstance(content, PlaylistName):
        tag_content = content.tag_content
    else:
        tag_content = content

    numerical_value_lookup = {}
    string_selector_type_map = {
        "artist": "get_artists",
        "comment": "get_comments",
        "date": "get_date_added",
        "key": "get_key",
        "label": "get_label",
    }
    string_value_lookup = {}
    playlists = set()

    # Grab selectors from Combiner playlist name.
    parse_numerical_selectors(
        re.findall(NUMERICAL_SELECTOR_REGEX, tag_content),
        numerical_value_lookup,
    )
    parse_string_selectors(
        re.findall(STRING_SELECTOR_REGEX, tag_content),
        string_value_lookup,
        string_selector_type_map,
        playlists,
    )

    # Add keys for numerical selectors for tracks having those values.
    for value, tag in numerical_value_lookup.items():
        if tag in tags_tracks:
            continue

        for track_id, track in collection.get_tracks().items():
            values = map(
                str,
                [round(track.get_bpm()), track.get_rating(), track.get_year()],
            )
            for val in values:
                if (isinstance(value, str) and value == val) or (
                    isinstance(value, tuple) and val in value
                ):
                    tags_tracks[tag][track_id] = track

    # Add keys for string selectors for tracks having those values.
    for selector, tag in string_value_lookup.items():
        if tag in tags_tracks:
            continue

        selector_type, selector_value = selector
        for track_id, track in collection.get_tracks().items():
            value = getattr(track, string_selector_type_map[selector_type])()
            if not value:
                continue
            if selector_type == "date":
                inequality, date, date_format = selector_value
                if not inequality:
                    if value.strftime(date_format) == date.strftime(
                        date_format
                    ):
                        tags_tracks[tag][track_id] = track
                    continue
                # In order for inequalities with lower precision levels than
                # YYYY-MM-DD to work properly, the date added value for the
                # track must be converted to the lower precision string and
                # back into a datetime object.
                value = datetime.strptime(
                    value.strftime(date_format), date_format
                )
                if not inequality(value, date):
                    continue
                tags_tracks[tag][track_id] = track
                continue
            if "*" in selector_value:
                exp = re.compile(r".*".join(selector_value.lower().split("*")))
                if re.search(exp, value.lower()):
                    tags_tracks[tag][track_id] = track
                    continue
            if value.lower() == selector_value.lower():
                tags_tracks[tag][track_id] = track

    # Get playlists for the identified playlist selectors. Not only must we get
    # playlists from the collection, but we must also get playlists from the
    # auto playlists constructed in the very same run of the playlist_builder.
    # This is because the playlists being selected may include those generated
    # by the playlist_builder.
    for playlist_name in playlists:
        playlist_key = f"{{playlist:{playlist_name}}}"
        if playlist_key in tags_tracks:
            continue

        for playlist_object in [collection, *auto_playlists]:
            for playlist in playlist_object.get_playlists(playlist_name):
                if playlist.is_folder():
                    continue
                tags_tracks[playlist_key].update(playlist.get_tracks())

aggregate_playlists(playlist, playlist_class, minimum_tracks=None)

Recursively aggregate tracks from folders into "All" playlists.

Parameters:

Name Type Description Default
playlist Playlist

Playlist which may be a folder or not.

required
playlist_class Playlist

Playlist implementation class.

required
minimum_tracks Optional[int]

Required number of tracks to make a playlist.

None

Returns:

Type Description
Dict[str, Track]

Dict of tracks.

Source code in djtools/collection/helpers.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def aggregate_playlists(
    playlist: Playlist,
    playlist_class: Playlist,
    minimum_tracks: Optional[int] = None,
) -> Dict[str, Track]:
    """Recursively aggregate tracks from folders into "All" playlists.

    Args:
        playlist: Playlist which may be a folder or not.
        playlist_class: Playlist implementation class.
        minimum_tracks: Required number of tracks to make a playlist.

    Returns:
        Dict of tracks.
    """
    # Get tracks from the playlist if it's not a folder.
    if not playlist.is_folder():
        return playlist.get_tracks() if playlist.aggregate() else {}

    # Recursively get tracks from each playlist within this folder.
    aggregate_tracks = {
        track_id: track
        for p in playlist
        for track_id, track in aggregate_playlists(
            p, playlist_class, minimum_tracks
        ).items()
    }

    playlist_too_small = (
        minimum_tracks and len(aggregate_tracks) < minimum_tracks
    )

    # Create an "All" playlist in this folder if the folder contains more than
    # one playlist.
    if playlist.aggregate() and not playlist_too_small:
        playlist.add_playlist(
            playlist_class.new_playlist(
                name=f"All {playlist.get_name()}", tracks=aggregate_tracks
            ),
            index=0,
        )

    return aggregate_tracks

build_combiner_playlists(content, tags_tracks, playlist_class, minimum_tracks=None)

Recursively traverses a playlist config to generate playlists from tags.

Parameters:

Name Type Description Default
content Union[PlaylistConfig, PlaylistName, str]

A component of a playlist config to create a playlist for.

required
tags_tracks Dict[str, Dict[str, Track]]

Dict of tags to tracks.

required
playlist_class Playlist

Playlist implementation class.

required
minimum_tracks Optional[int]

Required number of tracks to make a playlist.

None

Raises:

Type Description
ValueError

The user's playlist config must not be malformed.

Returns:

Type Description
Optional[Playlist]

A Playlist or None.

Source code in djtools/collection/helpers.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def build_combiner_playlists(
    content: Union[PlaylistConfig, PlaylistName, str],
    tags_tracks: Dict[str, Dict[str, Track]],
    playlist_class: Playlist,
    minimum_tracks: Optional[int] = None,
) -> Optional[Playlist]:
    """Recursively traverses a playlist config to generate playlists from tags.

    Args:
        content: A component of a playlist config to create a playlist for.
        tags_tracks: Dict of tags to tracks.
        playlist_class: Playlist implementation class.
        minimum_tracks: Required number of tracks to make a playlist.

    Raises:
        ValueError: The user's playlist config must not be malformed.

    Returns:
        A Playlist or None.
    """
    if not isinstance(content, (PlaylistConfigContent, PlaylistName, str)):
        raise ValueError(f"Invalid input type {type(content)}: {content}")

    # Folders can opt-in to having an aggregation playlist.
    enable_aggregation = None

    if isinstance(content, PlaylistName):
        tag_content = content.tag_content
        name = content.name or tag_content
    elif isinstance(content, str):
        tag_content = name = content
    if isinstance(content, PlaylistConfigContent):
        enable_aggregation = content.enable_aggregation

    # This is not a folder so a playlist with tracks must be created.
    if isinstance(content, (PlaylistName, str)):
        try:
            tracks = parse_expression(tag_content, tags_tracks)
        except Exception as exc:
            logger.warning(f"Error parsing expression: {tag_content}\n{exc}")
            return None

        if minimum_tracks and len(tracks) < minimum_tracks:
            return None

        return playlist_class.new_playlist(
            name=name, tracks=tracks, enable_aggregation=enable_aggregation
        )

    # This is a folder so create playlists for those playlists within it.
    playlists = []
    for item in content.playlists:
        playlist = build_combiner_playlists(
            item,
            tags_tracks,
            playlist_class,
            minimum_tracks=minimum_tracks,
        )
        if playlist:
            playlists.append(playlist)
        else:
            logger.warning(
                f"There are no tracks for the Combiner playlist: {item}"
            )
    if not playlists:
        logger.warning(
            f'There were no playlists created from "{content.playlists}"'
        )

    return playlist_class.new_playlist(
        name=content.name,
        playlists=playlists,
        enable_aggregation=enable_aggregation,
    )

build_tag_playlists(content, tags_tracks, playlist_class, tag_set=None, minimum_tracks=None)

Recursively traverses a playlist config to generate playlists from tags.

Parameters:

Name Type Description Default
content Union[PlaylistConfigContent, PlaylistName, str]

A component of a playlist config to create a playlist for.

required
tags_tracks Dict[str, Dict[str, Track]]

Dict of tags to tracks.

required
playlist_class Playlist

Playlist implementation class.

required
tag_set Optional[Set]

A set of tags seen while creating playlists. This is used to indicate which tags should be ignored when creating the "Unused Tags" playlists.

None
minimum_tracks Optional[int]

Required number of tracks to make a playlist.

None

Raises:

Type Description
ValueError

The user's playlist config must not be malformed.

Returns:

Type Description
Optional[Playlist]

A Playlist or None.

Source code in djtools/collection/helpers.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def build_tag_playlists(
    content: Union[PlaylistConfigContent, PlaylistName, str],
    tags_tracks: Dict[str, Dict[str, Track]],
    playlist_class: Playlist,
    tag_set: Optional[Set] = None,
    minimum_tracks: Optional[int] = None,
) -> Optional[Playlist]:
    """Recursively traverses a playlist config to generate playlists from tags.

    Args:
        content: A component of a playlist config to create a playlist for.
        tags_tracks: Dict of tags to tracks.
        playlist_class: Playlist implementation class.
        tag_set: A set of tags seen while creating playlists. This is used to
            indicate which tags should be ignored when creating the
            "Unused Tags" playlists.
        minimum_tracks: Required number of tracks to make a playlist.

    Raises:
        ValueError: The user's playlist config must not be malformed.

    Returns:
        A Playlist or None.
    """
    if not isinstance(content, (PlaylistConfigContent, PlaylistName, str)):
        raise ValueError(f"Invalid input type {type(content)}: {content}")

    # Initialize the set of tags in case the caller didn't provide one.
    tag_set = tag_set if tag_set is not None else set()

    # Folders can opt-in to having an aggregation playlist.
    enable_aggregation = None

    if isinstance(content, PlaylistConfigContent):
        enable_aggregation = content.enable_aggregation

    # This is a folder so create playlists for those playlists within it.
    if isinstance(content, PlaylistConfigContent):
        # Update the set of tags seen so these are ignored in when creating the
        # "Unused Tags" playlists.
        if content.name == "_ignore":
            tag_set.update(content.playlists)
            return None

        # Create playlists for each playlist in this folder.
        playlists = [
            build_tag_playlists(
                item,
                tags_tracks,
                playlist_class,
                tag_set,
                minimum_tracks=minimum_tracks,
            )
            for item in content.playlists
        ]
        playlists = [playlist for playlist in playlists if playlist]
        if not playlists:
            logger.warning(
                f'There were no playlists created from "{content.playlists}"'
            )
            return None

        return playlist_class.new_playlist(
            name=content.name,
            playlists=playlists,
            enable_aggregation=enable_aggregation,
        )

    # This is not a folder so a playlist with tracks must be created.

    # If a PlaylistName is used, then the tag_content field is used to
    # determine the tag corresponding with this playlist and the name field is
    # used to name the playlist. Otherwise, the content is a string that
    # represents both the tag and the name.
    if isinstance(content, PlaylistName):
        tag_content = content.tag_content
        name = content.name or tag_content
    else:
        tag_content = name = content

    # Apply special logic for creating a "pure" playlist. "Pure" playlists are
    # those that contain tracks with a set of genre tags that all contain the
    # sub-string indicated by the suffix of the playlist name. For example,
    # "Pure Techno" will contain tracks that have genres {"Hard Techno",
    # "Melodic Techno"} but will not contain tracks that contain
    # {"Hard Techno", "Tech House"} because "Tech House" does not contain
    # "Techno" as a sub-string.
    if tag_content.startswith("Pure "):
        # Isolate the tag to create a pure playlist for.
        tag = tag_content.split("Pure ")[-1]
        tracks_with_tag = tags_tracks.get(tag)
        if not tracks_with_tag:
            logger.warning(
                f'Can\'t make a "Pure {tag}" playlist because there are no '
                "tracks with that tag."
            )
            return None

        # Filter out tracks that aren't pure.
        pure_tag_tracks = {
            track_id: track
            for track_id, track in tracks_with_tag.items()
            if all(tag.lower() in _.lower() for _ in track.get_genre_tags())
        }
        if not pure_tag_tracks:
            logger.warning(
                f'Can\'t make a "Pure {tag}" playlist because there are no '
                f"tracks that are pure {tag}."
            )
            return None

        if minimum_tracks and len(pure_tag_tracks) < minimum_tracks:
            return None

        return playlist_class.new_playlist(
            name=name,
            tracks=pure_tag_tracks,
            enable_aggregation=enable_aggregation,
        )

    # Get tracks with this tag and index it so that it's not added to the
    # "Unused Tags" playlists.
    tracks_with_tag = tags_tracks.get(tag_content)
    tag_set.add(tag_content)
    if not tracks_with_tag:
        logger.warning(f'There are no tracks with the tag "{tag_content}"')
        return None

    if minimum_tracks and len(tracks_with_tag) < minimum_tracks:
        return None

    return playlist_class.new_playlist(
        name=name,
        tracks=tracks_with_tag,
        enable_aggregation=enable_aggregation,
    )

copy_file(track, destination)

Copies a track to a destination and updates its location.

Parameters:

Name Type Description Default
track Track

Track object.

required
destination Path

Directory to copy tracks to.

required
Source code in djtools/collection/helpers.py
51
52
53
54
55
56
57
58
59
60
61
62
63
@make_path
def copy_file(track: Track, destination: Path):
    """Copies a track to a destination and updates its location.

    Args:
        track: Track object.
        destination: Directory to copy tracks to.
    """
    loc = track.get_location()
    dest = destination / loc.name
    if not dest.exists():
        shutil.copyfile(loc.as_posix(), dest)
    track.set_location(dest)

filter_tag_playlists(playlist, playlist_filters)

Applies a list of PlaylistFilter implementations to the playlist.

If the PlaylistFilter implementations' is_filter_playlist method evaluates to True, then the filter_track method is applied to each track in the playlist. The playlist's tracks are set to remove the tracks that have been filtered out.

Parameters:

Name Type Description Default
playlist Playlist

Playlist to potentially have its tracks filtered.

required
playlist_filters List[PlaylistFilter]

A list of PlaylistFilter implementations used to filter playlist tracks.

required
Source code in djtools/collection/helpers.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def filter_tag_playlists(
    playlist: Playlist, playlist_filters: List[PlaylistFilter]
) -> None:
    """Applies a list of PlaylistFilter implementations to the playlist.

    If the PlaylistFilter implementations' is_filter_playlist method evaluates
    to True, then the filter_track method is applied to each track in the
    playlist. The playlist's tracks are set to remove the tracks that have been
    filtered out.

    Args:
        playlist: Playlist to potentially have its tracks filtered.
        playlist_filters: A list of PlaylistFilter implementations used to
            filter playlist tracks.
    """
    # This is a folder so filter its playlists.
    if playlist.is_folder():
        for _playlist in playlist:
            filter_tag_playlists(_playlist, playlist_filters)
        return

    # Apply each PlaylistFilter to this playlist.
    for playlist_filter in playlist_filters:
        if not playlist_filter.is_filter_playlist(playlist):
            continue
        playlist.set_tracks(
            tracks={
                track_id: track
                for track_id, track in playlist.get_tracks().items()
                if playlist_filter.filter_track(track)
            },
        )

parse_expression(expression, tags_tracks)

Parses a boolean algebra expression by constructing a tree.

Parameters:

Name Type Description Default
expression str

String representing boolean algebra expression.

required
tags_tracks Dict[str, Dict[str, Track]]

Dict of tags to tracks.

required

Returns:

Type Description
Dict[str, Track]

Dict of track IDs and tracks.

Source code in djtools/collection/helpers.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
def parse_expression(
    expression: str, tags_tracks: Dict[str, Dict[str, Track]]
) -> Dict[str, Track]:
    """Parses a boolean algebra expression by constructing a tree.

    Args:
        expression: String representing boolean algebra expression.
        tags_tracks: Dict of tags to tracks.

    Returns:
        Dict of track IDs and tracks.
    """
    node = BooleanNode(tags_tracks)
    tag = ""
    for char in expression:
        if char == "(":
            node = BooleanNode(tags_tracks, parent=node)
        elif node.is_operator(char):
            tag = node.add_operand(tag)
            node.add_operator(char)
        elif char == ")":
            tag = node.add_operand(tag)
            tracks = node.evaluate()
            node = node.get_parent()
            if tracks:
                node.add_operand(tracks)
        else:
            tag += char
    tag = node.add_operand(tag)

    return node.evaluate()

parse_numerical_selectors(numerical_matches, numerical_value_lookup)

Parses a string match of one or more numerical selectors.

Parameters:

Name Type Description Default
numerical_matches List[str]

List of numerical strings.

required
numerical_value_lookup Dict[Union[str, Tuple], str]

Empty dict to populate with tuples or strings mapping numerical ranges or values to their "tag" representation.

required

Returns:

Type Description
Set[str]

Set of numerical selector values.

Source code in djtools/collection/helpers.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
def parse_numerical_selectors(
    numerical_matches: List[str],
    numerical_value_lookup: Dict[Union[str, Tuple], str],
) -> Set[str]:
    """Parses a string match of one or more numerical selectors.

    Args:
        numerical_matches: List of numerical strings.
        numerical_value_lookup: Empty dict to populate with tuples or strings
            mapping numerical ranges or values to their "tag" representation.

    Returns:
        Set of numerical selector values.
    """
    numerical_values = set()
    for match in numerical_matches:
        _range = None
        # If "match" is a digit, then it's an explicit numerical value.
        if match.isdigit():
            numerical_values.add(match)
        # If "match" is two digits separated by a "-", then it's a range.
        elif len(match.split("-")) == 2 and all(
            x.isdigit() for x in match.split("-")
        ):
            _range = list(map(int, match.split("-")))
            _range = range(min(_range), max(_range) + 1)
            if not (
                all(0 <= x <= 5 for x in _range)
                or all(6 <= x <= 999 for x in _range)  # range for ratings
                or all(  # range for BPMs
                    x >= 1000 for x in _range
                )  # range for years
            ):
                logger.error(f"Bad numerical range selector: {match}")
                continue
            numerical_values.update(map(str, _range))
        else:
            logger.error(f"Malformed numerical selector: {match}")
            continue

        numerical_value_lookup[tuple(map(str, _range or [])) or match] = (
            f"[{match}]"
        )

    return numerical_values

parse_string_selectors(string_matches, string_value_lookup, string_selector_type_map, playlists)

Parses a string match of one or more string selectors.

Parameters:

Name Type Description Default
string_matches List[str]

List of strings for string selectors.

required
string_value_lookup Dict[Union[str, Tuple], str]

Empty dict to populate with strings mapping string selectors to their "tag" representation.

required
string_selector_type_map Dict[str, str]

Maps a selector type to a Track method name.

required
playlists Set[str]

Set for storing playlist names.

required
Source code in djtools/collection/helpers.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def parse_string_selectors(
    string_matches: List[str],
    string_value_lookup: Dict[Union[str, Tuple], str],
    string_selector_type_map: Dict[str, str],
    playlists: Set[str],
):
    """Parses a string match of one or more string selectors.

    Args:
        string_matches: List of strings for string selectors.
        string_value_lookup: Empty dict to populate with strings mapping string
            selectors to their "tag" representation.
        string_selector_type_map: Maps a selector type to a Track method name.
        playlists: Set for storing playlist names.
    """
    date_formats = ["%Y-%m-%d", "%Y-%m", "%Y"]

    for match in string_matches:
        selector_type, selector_value = map(str.strip, match.split(":"))
        if selector_type == "playlist":
            playlists.add(selector_value)
            continue
        if not string_selector_type_map.get(selector_type):
            logger.warning(f"{selector_type} is not a supported selector!")
            continue
        if selector_type != "date":
            string_value_lookup[(selector_type, selector_value)] = (
                f"{{{match}}}"
            )
            continue

        dates, formats, inequalities = [], [], []
        skip_date_selector = False
        for part in filter(
            None, re.split(DATE_SELECTOR_REGEX, selector_value)
        ):
            date = None
            date_format = "%Y-%m-%d"

            # Note if the part is an inequality and move onto the next part.
            if re.search(DATE_SELECTOR_REGEX, part):
                inequalities.append(INEQUALITY_MAP[part])
                continue

            # The following part may be a timedelta string...
            date = parse_timedelta(part)

            # ...but if it wasn't, it's probably an ISO format date string.
            if not date:
                for date_format in date_formats:
                    try:
                        date = datetime.strptime(part, date_format)
                    except ValueError:
                        continue
                    break

            # If there's no date, then the selector wasn't formatted correctly.
            if not date:
                skip_date_selector = True
                break

            dates.append(date)
            formats.append(date_format)

        if (
            skip_date_selector
            or len(dates) != 1
            or (len(inequalities) not in [0, 1])
        ):
            logger.warning(f"Date selector {selector_value} is invalid!")
            continue

        string_value_lookup[
            (
                selector_type,
                (
                    None if not inequalities else inequalities[0],
                    dates[0],
                    formats[0],
                ),
            )
        ] = f"{{{match}}}"

parse_timedelta(time_str)

Parse a timedelta from a string and return the relative offset from now.

Supported units of time
  • years
  • months
  • weeks
  • days
Some example strings
  • 1y
  • 6m
  • 3m2w
  • 7d

Modified from peter's answer at https://stackoverflow.com/a/51916936

Parameters:

Name Type Description Default
time_str

A string identifying a duration.

required

Returns:

Type Description
Optional[datetime]

A datetime.datetime object.

Source code in djtools/collection/helpers.py
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
def parse_timedelta(time_str) -> Optional[datetime]:
    """
    Parse a timedelta from a string and return the relative offset from now.

    Supported units of time:
      - years
      - months
      - weeks
      - days

    Some example strings:
      - 1y
      - 6m
      - 3m2w
      - 7d

    Modified from peter's answer at https://stackoverflow.com/a/51916936

    Args:
        time_str: A string identifying a duration.

    Returns:
        A datetime.datetime object.
    """
    parts = TIMEDELTA_REGEX.match(time_str)

    if not parts:
        return None

    time_params = {
        name: float(val) for name, val in parts.groupdict().items() if val
    }

    now = datetime.now()
    time_delta = relativedelta(**time_params)
    relative_time = now - time_delta

    return relative_time

print_data(data)

Prints an ASCII histogram of tag data.

Parameters:

Name Type Description Default
data Dict[str, int]

Tag names to tag counts.

required
Source code in djtools/collection/helpers.py
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
def print_data(data: Dict[str, int]):
    """Prints an ASCII histogram of tag data.

    Args:
        data: Tag names to tag counts.
    """
    data = {k: v for k, v in data.items() if v}
    scaled_data = scale_data(data)
    row_width = 0
    width_pad = 1
    row = max(scaled_data.items(), key=itemgetter(1))[1]
    output = ""
    while row > 0:
        output += "|"
        for key in data:
            key_width = len(key)
            key_center = round(key_width / 2)
            output += f"{' ' * (width_pad + key_center)}"
            output += f"{'*' if row <= scaled_data[key] else ' '}"
            output += f"{' ' * (width_pad + key_center)}"
        if not row_width:
            row_width = len(output)
        output += "\n"
        row -= 1
    output += "-" * row_width + "\n "
    for key in data:
        output += f"{' ' * width_pad}{key}{' ' * (width_pad + 1)}"
    print(output)

print_playlists_tag_statistics(combiner_playlists)

Prints tag statistics for Combiner playlists.

Statistics are split out by Combiner playlist and then by TagParser type.

Parameters:

Name Type Description Default
combiner_playlists Playlist

Playlist object for Combiner playlists.

required
Source code in djtools/collection/helpers.py
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
def print_playlists_tag_statistics(combiner_playlists: Playlist) -> None:
    """Prints tag statistics for Combiner playlists.

    Statistics are split out by Combiner playlist and then by TagParser type.

    Args:
        combiner_playlists: Playlist object for Combiner playlists.
    """
    playlists = []
    playlist_stack = [combiner_playlists]
    while playlist_stack:
        item = playlist_stack.pop()
        if item.is_folder():
            playlist_stack.extend(item.get_playlists())
            continue
        playlists.append(item)

    for playlist in playlists:
        tracks = playlist.get_tracks()
        if tracks:
            print(f"\n{playlist.get_name()} tag statistics:")
        playlist_tags = defaultdict(int)
        genre_tags = set()
        other_tags = set()
        for track in tracks.values():
            track_all_tags = track.get_tags()
            track_genre_tags = set(track.get_genre_tags())
            other_tags.update(set(track_all_tags).difference(track_genre_tags))
            genre_tags.update(track_genre_tags)
            for tag in track_all_tags:
                playlist_tags[tag] += 1
        for tag_subset, tags in [
            ("Genre", sorted(genre_tags)),
            ("Other", sorted(other_tags)),
        ]:
            data = {tag: playlist_tags[tag] for tag in tags}
            if data:
                print(f"\n{tag_subset}:")
                print_data(data)

scale_data(data, maximum=25)

Scales range of data values with an upper bound.

Parameters:

Name Type Description Default
data Dict[str, int]

Tag names to tag counts.

required
maximum Optional[int]

Upper bound for re-scaled data.

25

Returns:

Type Description
Dict[str, int]

Re-scaled dictionary of tag names and tag counts.

Source code in djtools/collection/helpers.py
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
def scale_data(
    data: Dict[str, int], maximum: Optional[int] = 25
) -> Dict[str, int]:
    """Scales range of data values with an upper bound.

    Args:
        data: Tag names to tag counts.
        maximum: Upper bound for re-scaled data.

    Returns:
        Re-scaled dictionary of tag names and tag counts.
    """
    data_max = max(data.items(), key=itemgetter(1))[1]

    return {
        k: max(round((v / data_max) * maximum), 1) for k, v in data.items()
    }