Skip to content

Utils

This module contains the configuration object for the utils package. The attributes of this configuration object correspond with the "utils" key of config.yaml

UtilsConfig

Bases: BaseConfigFormatter

Configuration object for the utils package.

Source code in djtools/utils/config.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class UtilsConfig(BaseConfigFormatter):
    """Configuration object for the utils package."""

    audio_bitrate: str = "320"
    audio_destination: Optional[Path] = None
    audio_format: AudioFormats = AudioFormats.MP3
    audio_headroom: NonNegativeFloat = 0.0
    check_tracks: bool = False
    check_tracks_fuzz_ratio: NonNegativeInt = 80
    check_tracks_spotify_playlists: List[str] = []
    local_dirs: List[Path] = []
    normalize_audio: bool = False
    process_recording: bool = False
    recording_file: Optional[Path] = None
    recording_playlist: str = ""
    trim_initial_silence: Union[int, TrimInitialSilenceMode] = 0
    url_download: str = ""

    def __init__(self, *args, **kwargs):
        """Constructor.

        Raises:
            RuntimeError: aws_profile must be set for check_tracks.
        """

        super().__init__(*args, **kwargs)
        if self.check_tracks:
            if not os.environ.get("AWS_PROFILE"):
                raise RuntimeError(
                    "Without aws_profile set to a valid profile ('default' or "
                    "otherwise) you cannot use the check_tracks feature"
                )
            if self.check_tracks_spotify_playlists:
                logger.warning(
                    "check_tracks depends on valid Spotify API credentials in "
                    "SpotifyConfig."
                )

        if self.process_recording:
            if not self.recording_file.exists():
                raise RuntimeError(
                    f'Could not find recording_file "{self.recording_file}"'
                )
            if not self.recording_playlist:
                raise RuntimeError(
                    "You must provide a playlist name as recording_playlist "
                    "and this name must exists in spotify_playlists.yaml."
                )

    @field_validator("audio_bitrate")
    @classmethod
    def bitrate_validation(cls, value: str) -> str:
        """Validates audio_bitrate is in the range and casts it to a string.

        Args:
            value: audio_bitrate field

        Raises:
            ValueError: audio_bitrate must be in the range [36, 320]

        Returns:
            String representing the bit rate.
        """
        value = int(value)
        if value < 36 or value > 320:
            raise ValueError("audio_bitrate must be in the range [36, 320]")

        return str(value)

    @model_validator(mode="after")
    @classmethod
    def format_validation(cls, model: "UtilsConfig") -> "UtilsConfig":
        """Logs a warning message to install FFmpeg if audio_format isn't wav.

        Args:
            model: The validated model instance.

        Returns:
            The validated model instance.
        """
        if model.audio_format != "wav" and (
            model.normalize_audio or model.process_recording
        ):
            logger.warning(
                "You must install FFmpeg in order to use non-wav file formats."
            )

        return model

__init__(*args, **kwargs)

Constructor.

Raises:

Type Description
RuntimeError

aws_profile must be set for check_tracks.

Source code in djtools/utils/config.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def __init__(self, *args, **kwargs):
    """Constructor.

    Raises:
        RuntimeError: aws_profile must be set for check_tracks.
    """

    super().__init__(*args, **kwargs)
    if self.check_tracks:
        if not os.environ.get("AWS_PROFILE"):
            raise RuntimeError(
                "Without aws_profile set to a valid profile ('default' or "
                "otherwise) you cannot use the check_tracks feature"
            )
        if self.check_tracks_spotify_playlists:
            logger.warning(
                "check_tracks depends on valid Spotify API credentials in "
                "SpotifyConfig."
            )

    if self.process_recording:
        if not self.recording_file.exists():
            raise RuntimeError(
                f'Could not find recording_file "{self.recording_file}"'
            )
        if not self.recording_playlist:
            raise RuntimeError(
                "You must provide a playlist name as recording_playlist "
                "and this name must exists in spotify_playlists.yaml."
            )

bitrate_validation(value) classmethod

Validates audio_bitrate is in the range and casts it to a string.

Parameters:

Name Type Description Default
value str

audio_bitrate field

required

Raises:

Type Description
ValueError

audio_bitrate must be in the range [36, 320]

Returns:

Type Description
str

String representing the bit rate.

Source code in djtools/utils/config.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@field_validator("audio_bitrate")
@classmethod
def bitrate_validation(cls, value: str) -> str:
    """Validates audio_bitrate is in the range and casts it to a string.

    Args:
        value: audio_bitrate field

    Raises:
        ValueError: audio_bitrate must be in the range [36, 320]

    Returns:
        String representing the bit rate.
    """
    value = int(value)
    if value < 36 or value > 320:
        raise ValueError("audio_bitrate must be in the range [36, 320]")

    return str(value)

format_validation(model) classmethod

Logs a warning message to install FFmpeg if audio_format isn't wav.

Parameters:

Name Type Description Default
model UtilsConfig

The validated model instance.

required

Returns:

Type Description
UtilsConfig

The validated model instance.

Source code in djtools/utils/config.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@model_validator(mode="after")
@classmethod
def format_validation(cls, model: "UtilsConfig") -> "UtilsConfig":
    """Logs a warning message to install FFmpeg if audio_format isn't wav.

    Args:
        model: The validated model instance.

    Returns:
        The validated model instance.
    """
    if model.audio_format != "wav" and (
        model.normalize_audio or model.process_recording
    ):
        logger.warning(
            "You must install FFmpeg in order to use non-wav file formats."
        )

    return model

This module is used to compare tracks from Spotify playlists and / or local directories to see if there is any overlap with the contents of the Beatcloud.

compare_tracks(config, beatcloud_tracks=None)

Compares tracks from Spotify / local with Beatcloud tracks.

Gets track titles and artists from Spotify playlist(s) and / or file names from local directories, and get file names from the beatcloud. Then compute the Levenshtein similarity between their product in order to identify any overlapping tracks.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
beatcloud_tracks Optional[List[str]]

Cached list of tracks from S3.

None

Returns:

Type Description
Tuple[List[str], List[str]]

Tuple with a list of all Beatcloud tracks and list of full paths to matching Beatcloud tracks.

Source code in djtools/utils/check_tracks.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def compare_tracks(
    config: BaseConfig,
    beatcloud_tracks: Optional[List[str]] = None,
) -> Tuple[List[str], List[str]]:
    """Compares tracks from Spotify / local with Beatcloud tracks.

    Gets track titles and artists from Spotify playlist(s) and / or file names
    from local directories, and get file names from the beatcloud. Then compute
    the Levenshtein similarity between their product in order to identify any
    overlapping tracks.

    Args:
        config: Configuration object.
        beatcloud_tracks: Cached list of tracks from S3.

    Returns:
        Tuple with a list of all Beatcloud tracks and list of full paths to
            matching Beatcloud tracks.
    """
    if config.sync.download_spotify_playlist:
        cached_local_dirs = config.utils.local_dirs
        config.utils.local_dirs = []
        spotify_playlists = [config.sync.download_spotify_playlist]
    else:
        spotify_playlists = config.utils.check_tracks_spotify_playlists

    track_sets = []
    beatcloud_matches = []
    if spotify_playlists:
        spotify_tracks = get_spotify_tracks(config, spotify_playlists)
        if not spotify_tracks:
            if config.sync.download_spotify_playlist:
                substring = "download_spotify_playlist is a key"
            else:
                substring = (
                    "check_tracks_spotify_playlists has one or more keys"
                )
            logger.warning(
                f"There are no Spotify tracks; make sure {substring} in "
                "spotify_playlists.yaml"
            )
        else:
            track_results = defaultdict(list)
            for playlist_name, playlist_tracks in spotify_tracks.items():
                for track in playlist_tracks:
                    title = track["track"]["name"]
                    artists = ", ".join(
                        [y["name"] for y in track["track"]["artists"]]
                    )
                    track_results[playlist_name].append(
                        f"{artists} - {title}"
                        if config.sync.artist_first
                        else f"{title} - {artists}"
                    )
            track_sets.append((track_results, "Spotify Playlist Tracks"))
    if config.utils.local_dirs:
        local_tracks = get_local_tracks(config)
        if not local_tracks:
            logger.warning(
                "There are no local tracks; make sure local_dirs has one or "
                "more directories containing one or more tracks"
            )
        else:
            track_results = {
                key: [track.stem for track in value]
                for key, value in local_tracks.items()
            }
            track_sets.append((track_results, "Local Directory Tracks"))

    if config.sync.download_spotify_playlist:
        config.utils.local_dirs = cached_local_dirs

    if not track_sets:
        return beatcloud_tracks, beatcloud_matches

    if not beatcloud_tracks:
        beatcloud_tracks = get_beatcloud_tracks(config.sync.bucket_url)

    path_lookup = {x.stem: x for x in beatcloud_tracks}

    for tracks, track_type in track_sets:
        if config.sync.artist_first and track_type == "Local Directory Tracks":
            path_lookup = reverse_title_and_artist(path_lookup)
        matches = find_matches(
            tracks,
            path_lookup.keys(),
            config,
        )
        logger.info(f"\n{track_type} / Beatcloud Matches: {len(matches)}")
        for loc, matches in groupby(
            sorted(matches, key=itemgetter(0)), key=itemgetter(0)
        ):
            logger.info(f"{loc}:")
            for _, track, beatcloud_track, fuzz_ratio in matches:
                beatcloud_matches.append(path_lookup[beatcloud_track])
                logger.info(f"\t{fuzz_ratio}: {track} | {beatcloud_track}")

    return beatcloud_tracks, beatcloud_matches

This module is used to normalize audio files in one or more directories.

normalize(config)

Gets local tracks and normalizes them.

Tracks will be overwritten and have a headroom equal to audio_headroom.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required

Raises:

Type Description
RuntimeError

Must have local tracks to normalize.

Source code in djtools/utils/normalize_audio.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def normalize(config: BaseConfig):
    """Gets local tracks and normalizes them.

    Tracks will be overwritten and have a headroom equal to audio_headroom.

    Args:
        config: Configuration object.

    Raises:
        RuntimeError: Must have local tracks to normalize.
    """
    folder_tracks = get_local_tracks(config)
    if not folder_tracks:
        raise RuntimeError(
            "There are no local tracks; make sure local_dirs has one or "
            "more directories containing one or more tracks"
        )

    for track in [
        track
        for tracks in folder_tracks.values()
        for track in tracks
        if track.is_file() and not track.name.startswith(".")
    ]:
        try:
            audio = AudioSegment.from_file(track)
        except Exception as exc:
            logger.error(f"Couldn't decode {track}: {exc}")
            continue

        if abs(audio.max_dBFS + config.utils.audio_headroom) > 0.001:
            logger.info(
                f"{track} has a max dB of {audio.max_dBFS}, normalizing to "
                f"have a headroom of {config.utils.audio_headroom}..."
            )
            try:
                tags = utils.mediainfo(track).get("TAG", {})
            except FileNotFoundError as exc:
                logger.warning(
                    f"Couldn't export {track.stem} with ID3 tags; ensure "
                    f'"ffmpeg" is installed: {exc}'
                )
                tags = {}
            audio = effects.normalize(
                audio, headroom=config.utils.audio_headroom
            )
            audio.export(
                track.parent
                / f"{track.stem}.{config.utils.audio_format.value}",
                tags=tags,
                bitrate=f"{config.utils.audio_bitrate}k",
                format=config.utils.audio_format.value,
            )
            continue

This module is used to process an audio recording.

Given a recording of multiple tracks and a Spotify playlist, use the information from the Spotify API to:

  • split the recording into individual files
  • name these files with the title and artist(s)
  • populate the title, artist, and album tags
  • normalize the audio so the headroom is audio_headroom decibels
  • export the files with the configured audio_bitrate and audio_format

process(config)

Process a recording whose contents map to tracks in a Spotify playlist.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required

Raises:

Type Description
RuntimeError

The configured recording_playlist must both exist in spotify_playlists.yaml and have tracks in it.

Source code in djtools/utils/process_recording.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def process(config: BaseConfig):
    """Process a recording whose contents map to tracks in a Spotify playlist.

    Args:
        config: Configuration object.

    Raises:
        RuntimeError: The configured recording_playlist must both exist
            in spotify_playlists.yaml and have tracks in it.
    """
    # Get the tracks of the target Spotify playlist.
    tracks = get_spotify_tracks(config, [config.utils.recording_playlist])
    if not tracks:
        raise RuntimeError(
            "There are no Spotify tracks; make sure download_spotify_playlist "
            "is a key from spotify_playlists.yaml"
        )

    # Parse the relevant data from the track responses.
    track_data = []
    playlist_duration = 0
    for track in tracks[config.utils.recording_playlist]:
        # Parse release date field based on the date precision
        date_year = ""
        release_date = track["track"]["album"]["release_date"]
        release_precision = track["track"]["album"]["release_date_precision"]
        if release_precision == "year":
            date_year = datetime.strptime(release_date, "%Y").year
        elif release_precision == "month":
            date_year = datetime.strptime(release_date, "%Y-%m").year
        elif release_precision == "day":
            date_year = datetime.strptime(release_date, "%Y-%m-%d").year

        # TODO(a-rich): Why won't Rekordbox load "label" and "year" tags?!
        data = {
            "album": track["track"]["album"]["name"],
            "artist": ", ".join(
                [y["name"] for y in track["track"]["artists"]]
            ),
            # NOTE: There's a 500 ms gap between tracks during playback that
            # must be accounted for.
            "duration": track["track"]["duration_ms"] + 500,
            "label": track["track"]["album"].get("label", ""),
            "title": track["track"]["name"],
            "year": date_year,
        }
        track_data.append(data)
        playlist_duration += data["duration"]

    # Load the audio and trim the initial silence.
    logger.info("Loading audio...")
    audio = AudioSegment.from_file(config.utils.recording_file)
    if config.utils.trim_initial_silence:
        audio = trim_initial_silence(
            audio,
            [track["duration"] for track in track_data],
            config.utils.trim_initial_silence,
        )

    # Check that the audio is at least as long as the playlist duration.
    audio_duration = len(audio)
    if audio_duration < playlist_duration:
        logger.warning(
            f"{config.utils.recording_file} has a duration of {audio_duration} "
            "milliseconds which is less than the sum of track lengths in the "
            f"Spotify playlist {config.utils.recording_playlist} which is "
            f"{playlist_duration} milliseconds. Please confirm your recording "
            "went as expected!"
        )

    # Create destination for exported audio.
    write_path = config.utils.audio_destination
    write_path.mkdir(parents=True, exist_ok=True)

    # Split recording into the individual tracks.
    audio_chunks = []
    for track in track_data:
        track_audio = audio[: track["duration"]]
        audio = audio[track["duration"] :]
        audio_chunks.append(track_audio)

    # Normalize audio and export tracks with tags.
    payload = zip(
        [config] * len(audio_chunks),
        audio_chunks,
        track_data,
        [write_path] * len(audio_chunks),
    )
    exported_files = []

    with ThreadPoolExecutor(
        max_workers=os.cpu_count() * 4  # pylint: disable=no-member
    ) as executor:
        exported_files = executor.map(
            lambda args: process_parallel(*args), payload
        )

    for index, filepath in enumerate(exported_files):
        creation_time = datetime.now().timestamp() + index
        os.utime(filepath, (creation_time, creation_time))

This module is used to download tracks from "url_download". For example, a Soundcloud playlist can be made and the URL of that playlist can be provided to download all those tracks and rename them to cleanup the digits appended to the files by the youtube-dl package.

fix_up(_file)

Removes digits appended to file name by youtube-dl.

Parameters:

Name Type Description Default
_file Path

Music file name.

required

Returns:

Type Description
Path

Cleaned up music file name.

Source code in djtools/utils/url_download.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@make_path
def fix_up(_file: Path) -> Path:
    """Removes digits appended to file name by youtube-dl.

    Args:
        _file: Music file name.

    Returns:
        Cleaned up music file name.
    """
    ext = _file.suffix
    exp = rf"(\-\d{{1,}}(?={ext}))"
    stripped = Path(re.split(exp, _file.as_posix())[0]).stem
    name = Path(" - ".join(stripped.split(" - ")[-1::-1]))

    return name.with_suffix(ext)

url_download(config)

Downloads music files from a provided URL using the youtube-dl package.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
Source code in djtools/utils/url_download.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def url_download(config: BaseConfig):
    """Downloads music files from a provided URL using the youtube-dl package.

    Args:
        config: Configuration object.
    """
    dl_loc = config.utils.audio_destination or Path(".")
    dl_loc.mkdir(parents=True, exist_ok=True)

    ydl_opts = {
        "postprocessors": [
            {
                "key": "FFmpegExtractAudio",
                "preferredcodec": config.utils.audio_format.value,
                "preferredquality": config.utils.audio_bitrate,
            }
        ],
        "outtmpl": (dl_loc / "%(title)s.tmp").as_posix(),
    }

    with ytdl.YoutubeDL(ydl_opts) as ydl:
        logger.info(f"Downloading {config.utils.url_download} to {dl_loc}")
        ydl.download([config.utils.url_download])

    for _file in dl_loc.iterdir():
        (dl_loc / _file).rename(dl_loc / fix_up(_file))

This module contains helper functions that are not specific to any particular sub-package of this library.

compute_distance(spotify_playlist, spotify_track, beatcloud_track, threshold)

Qualifies a match between a Spotify track and a beatcloud track using Levenshtein similarity.

Parameters:

Name Type Description Default
spotify_playlist str

Playlist that Spotify track belongs to.

required
spotify_track str

Spotify track title and artist name.

required
beatcloud_track str

Beatcloud track title and artist name

required
threshold float

Levenshtein similarity threshold for acceptance.

required

Returns:

Type Description
Tuple[str, float]

Tuple of Spotify playlist, Spotify "TRACK TITLE - ARTIST NAME", beatcloud "TRACK TITLE - ARTIST NAME", Levenshtein similarity.

Source code in djtools/utils/helpers.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def compute_distance(
    spotify_playlist: str,
    spotify_track: str,
    beatcloud_track: str,
    threshold: float,
) -> Tuple[str, float]:
    """Qualifies a match between a Spotify track and a beatcloud track using
        Levenshtein similarity.

    Args:
        spotify_playlist: Playlist that Spotify track belongs to.
        spotify_track: Spotify track title and artist name.
        beatcloud_track: Beatcloud track title and artist name
        threshold: Levenshtein similarity threshold for acceptance.

    Returns:
        Tuple of Spotify playlist, Spotify "TRACK TITLE - ARTIST NAME",
            beatcloud "TRACK TITLE - ARTIST NAME", Levenshtein similarity.
    """
    ret = ()
    fuzz_ratio = fuzz.ratio(spotify_track, beatcloud_track)
    if fuzz_ratio >= threshold:
        ret = spotify_playlist, spotify_track, beatcloud_track, fuzz_ratio
    return ret

find_matches(compare_tracks, beatcloud_tracks, config)

Computes the Levenshtein similarity between beatcloud tracks the given tracks to compare with and returns those that match above a threshold.

Parameters:

Name Type Description Default
compare_tracks Dict[str, Set[str]]

Dictionary with either local directory or Spotify playlist keys and filenames or title and artists values.

required
beatcloud_tracks List[str]

Beatcloud track titles and artist names.

required
config BaseConfig

Configuration object.

required

Returns:

Type Description
List[Tuple[str, float]]

List of tuples of track location (directory or playlist), track name, Beatcloud track, and Levenshtein distance.

Source code in djtools/utils/helpers.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def find_matches(
    compare_tracks: Dict[str, Set[str]],
    beatcloud_tracks: List[str],
    config: BaseConfig,
) -> List[Tuple[str, float]]:
    """Computes the Levenshtein similarity between beatcloud tracks the given
        tracks to compare with and returns those that match above a threshold.

    Args:
        compare_tracks: Dictionary with either local directory or Spotify
            playlist keys and filenames or title and artists values.
        beatcloud_tracks: Beatcloud track titles and artist names.
        config: Configuration object.

    Returns:
        List of tuples of track location (directory or playlist), track name,
            Beatcloud track, and Levenshtein distance.
    """
    playlist_tracks = [
        (playlist, track)
        for playlist, tracks in compare_tracks.items()
        for track in tracks
    ]
    _product = list(product(playlist_tracks, beatcloud_tracks))
    _temp, beatcloud_tracks = zip(*_product)
    locations, tracks = zip(*_temp)
    payload = zip(
        locations,
        tracks,
        beatcloud_tracks,
        [config.utils.check_tracks_fuzz_ratio] * len(_product),
    )

    with ThreadPoolExecutor(
        max_workers=os.cpu_count() * 4  # pylint: disable=no-member
    ) as executor:
        futures = [
            executor.submit(compute_distance, *args) for args in payload
        ]

        with tqdm(
            total=len(futures), desc="Matching new tracks and Beatcloud tracks"
        ) as pbar:
            matches = []
            for future in as_completed(futures):
                result = future.result()
                if result:
                    matches.append(result)
                pbar.update(1)

    return matches

get_beatcloud_tracks(bucket_url)

Lists all the music files in S3 and parses out the track titles and artist names.

Parameters:

Name Type Description Default
bucket_url

URL to an AWS S3 API compliant bucket.

required

Returns:

Type Description
List[str]

Beatcloud track titles and artist names.

Source code in djtools/utils/helpers.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def get_beatcloud_tracks(bucket_url) -> List[str]:
    """Lists all the music files in S3 and parses out the track titles and
        artist names.

    Args:
        bucket_url: URL to an AWS S3 API compliant bucket.

    Returns:
        Beatcloud track titles and artist names.
    """
    cmd = ["aws", "s3", "ls", "--recursive", f"{bucket_url}/dj/music/"]
    output = check_output(cmd).decode("utf-8").split("\n")
    tracks = [Path(track) for track in output if track]
    logger.info(f"Got {len(tracks)} tracks from the beatcloud")

    return tracks

get_local_tracks(config)

Aggregates the files from one or more local directories in a dictionary mapped with parent directories.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required

Returns:

Type Description
Dict[str, List[str]]

Local file names keyed by parent directory.

Source code in djtools/utils/helpers.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_local_tracks(config: BaseConfig) -> Dict[str, List[str]]:
    """Aggregates the files from one or more local directories in a dictionary
        mapped with parent directories.

    Args:
        config: Configuration object.

    Returns:
        Local file names keyed by parent directory.
    """
    local_dir_tracks = {}
    for _dir in config.utils.local_dirs:
        if not _dir.exists():
            logger.warning(
                f"{_dir} does not exist; will not be able to check its "
                "contents against the beatcloud"
            )
            continue
        files = list(_dir.rglob("**/*.*"))
        if files:
            local_dir_tracks[_dir] = files
    local_tracks_count = sum(len(x) for x in local_dir_tracks.values())
    logger.info(f"Got {local_tracks_count} files under local directories")

    return local_dir_tracks

get_playlist_tracks(spotify, playlist_id)

Queries Spotify API for a playlist and pulls tracks from it.

Parameters:

Name Type Description Default
spotify Spotify

Spotify client.

required
playlist_id str

Playlist ID of Spotify playlist to pull tracks from.

required

Raises:

Type Description
RuntimeError

Playlist_id must correspond with a valid Spotify playlist.

Returns:

Type Description
List[Dict]

List of Spotify track results.

Source code in djtools/utils/helpers.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def get_playlist_tracks(
    spotify: spotipy.Spotify, playlist_id: str
) -> List[Dict]:
    """Queries Spotify API for a playlist and pulls tracks from it.

    Args:
        spotify: Spotify client.
        playlist_id: Playlist ID of Spotify playlist to pull tracks from.

    Raises:
        RuntimeError: Playlist_id must correspond with a valid Spotify playlist.

    Returns:
        List of Spotify track results.
    """
    try:
        playlist = spotify.playlist(playlist_id)
    except Exception:
        raise RuntimeError(
            f"Failed to get playlist with ID {playlist_id}"
        ) from Exception

    result = playlist["tracks"]
    tracks = list(result["items"])
    while result["next"]:
        result = spotify.next(result)
        tracks.extend(list(result["items"]))

    return tracks

get_spotify_tracks(config, playlists)

Aggregates the tracks from one or more Spotify playlists into a dictionary mapped with playlist names.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
playlists List[str]

List of Spotify playlist name.

required

Returns:

Type Description
Dict[str, List[Dict]]

Spotify tracks keyed by playlist name.

Source code in djtools/utils/helpers.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def get_spotify_tracks(
    config: BaseConfig, playlists: List[str]
) -> Dict[str, List[Dict]]:
    """Aggregates the tracks from one or more Spotify playlists into a
        dictionary mapped with playlist names.

    Args:
        config: Configuration object.
        playlists: List of Spotify playlist name.

    Returns:
        Spotify tracks keyed by playlist name.
    """
    spotify = get_spotify_client(config)
    playlist_ids = get_playlist_ids()

    playlist_tracks = {}
    _sum = 0
    for playlist in playlists:
        playlist_id = playlist_ids.get(playlist)
        if not playlist_id:
            logger.error(f"{playlist} not in spotify_playlists.yaml")
            continue
        playlist_tracks[playlist] = get_playlist_tracks(spotify, playlist_id)
        length = len(playlist_tracks[playlist])
        logger.info(
            f'Got {length} track{"" if length == 1 else "s"} from Spotify '
            f'playlist "{playlist}"'
        )
        _sum += length

        if config.verbosity > 0:
            for track in playlist_tracks[playlist]:
                logger.info(f"\t{track}")
    logger.info(
        f'Got {_sum} track{"" if _sum == 1 else "s"} from Spotify in total'
    )

    return playlist_tracks

initialize_logger()

Initializes logger from configuration.

Returns:

Type Description
Tuple[Logger, str]

Tuple containing Logger and associated log file.

Source code in djtools/utils/helpers.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def initialize_logger() -> Tuple[logging.Logger, str]:
    """Initializes logger from configuration.

    Returns:
        Tuple containing Logger and associated log file.
    """
    log_file = (
        Path(__file__).parent.parent
        / "logs"
        / f'{datetime.now().strftime("%Y-%m-%d")}.log'
    )
    logging_config = {
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "baseFormatter": {
                "format": "%(asctime)s - %(name)s:%(lineno)s - %(levelname)s - %(message)s",
                "datefmt": "%Y-%m-%d %H:%M:%S",
            },
        },
        "handlers": {
            "fileHandler": {
                "class": "logging.FileHandler",
                "level": "DEBUG",
                "formatter": "baseFormatter",
                "filename": log_file.as_posix(),
            },
            "streamHandler": {
                "class": "logging.StreamHandler",
                "level": "DEBUG",
                "formatter": "baseFormatter",
                "stream": "ext://sys.stdout",
            },
        },
        "loggers": {
            "": {  # root logger
                "handlers": ["fileHandler", "streamHandler"],
                "level": "DEBUG",
                "propagate": False,
            },
        },
    }
    logging.config.dictConfig(logging_config)

    return logging.getLogger(__name__), log_file

make_path(func)

Decorator for converting Path-typed args to Paths.

Parameters:

Name Type Description Default
func Callable

Callable being decorated with this function.

required

Raises:

Type Description
RuntimeError

args annotated with a pathlib.Path need to be able to have Paths created from them.

RuntimeError

kwargs annotated with a pathlib.Path need to be able to have Paths created from them.

Returns:

Type Description
Callable

The Callable being wrapped by this decorator.

Source code in djtools/utils/helpers.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def make_path(func: Callable) -> Callable:
    """Decorator for converting Path-typed args to Paths.

    Args:
        func: Callable being decorated with this function.

    Raises:
        RuntimeError: args annotated with a pathlib.Path need to be able to
            have Paths created from them.
        RuntimeError: kwargs annotated with a pathlib.Path need to be able to
            have Paths created from them.

    Returns:
        The Callable being wrapped by this decorator.
    """

    @wraps(make_path)
    def str_to_path(*args, **kwargs):
        """Converts non-Path type args into Paths if annotated as Paths.

        Raises:
            RuntimeError: args annotated with a pathlib.Path need to be able to
                have Paths created from them.
            RuntimeError: kwargs annotated with a pathlib.Path need to be able
                to have Paths created from them.
        """
        # Get the function's type annotations and partition them by args and
        # kwargs.
        path_types = (pathlib.Path, typing.Union[pathlib.Path, None])
        num_args = 0
        num_kwargs = 0
        type_hints = list(typing.get_type_hints(func).values())
        sig = inspect.signature(func)
        for parameter in sig.parameters.values():
            if parameter.name == "self":
                type_hints.insert(0, "self")
            if parameter.name in kwargs:
                num_kwargs += 1
            else:
                num_args += 1
        arg_type_hints = type_hints[:num_args]
        kwarg_type_hints = type_hints[:num_kwargs]

        # Convert each arg to a Path if the annotation type is pathlib.Path.
        args = list(args)
        for index, (arg, arg_type) in enumerate(zip(args, arg_type_hints)):
            # Skip if the arg shouldn't be a path or it should be a Path but
            # already is.
            if arg_type not in path_types or (
                arg_type in path_types and isinstance(arg, Path)
            ):
                continue

            try:
                args[index] = Path(arg)
            except Exception as exc:
                raise RuntimeError(
                    "Error creating Path in function "
                    f'"{func.__name__}" from positional arg "{arg}" annotated '
                    f'with type "{arg_type}": {exc}'
                ) from Exception
        args = tuple(args)

        # Convert each kwarg to a Path if the annotation type is pathlib.Path.
        for (key, value), arg_type in zip(kwargs.items(), kwarg_type_hints):
            # Skip if the arg value shouldn't be a path or it should be a Path
            # but already is.
            if arg_type not in path_types or (
                arg_type in path_types and isinstance(value, Path)
            ):
                continue

            try:
                kwargs[key] = Path(value)
            except Exception as exc:
                raise RuntimeError(
                    "Error creating Path in function "
                    f'"{func.__name__}" from keyword arg "{key}={value}" '
                    f'annotated with type "{arg_type}": {exc}'
                ) from Exception

        return func(*args, **kwargs)

    return str_to_path

process_parallel(config, audio, track, write_path)

Normalize and export track with tags.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
audio AudioSegment

Audio for a track.

required
track Dict

Metadata for track audio.

required
write_path Path

Destination for exported audio.

required

Returns:

Type Description
Path

Path that the file was written to.

Source code in djtools/utils/helpers.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def process_parallel(
    config: BaseConfig, audio: AudioSegment, track: Dict, write_path: Path
) -> Path:
    """Normalize and export track with tags.

    Args:
        config: Configuration object.
        audio: Audio for a track.
        track: Metadata for track audio.
        write_path: Destination for exported audio.

    Returns:
        Path that the file was written to.
    """
    # Normalize the audio such that the headroom is
    # audio_headroom dB.
    if abs(audio.max_dBFS + config.utils.audio_headroom) > 0.001:
        audio = effects.normalize(audio, headroom=config.utils.audio_headroom)

    # Build the filename using the title, artist(s) and configured format.
    filename = (
        f'{track["artist"]} - {track["title"]}'
        if config.sync.artist_first
        else f'{track["title"]} - {track["artist"]}'
    )
    filename = write_path / f"{filename}.{config.utils.audio_format.value}"

    # Warn users about malformed filenames that could break other features
    # of djtools.
    if str(filename).count(" - ") > 1:
        logger.warning(
            f'{filename} has more than one occurrence of " - "! '
            "Because djtools splits on this sequence of characters to "
            "separate track title and artist(s), you might get unexpected "
            'behavior while using features like "--check-tracks".'
        )

    # Export the audio with the configured format and bit rate with the tag
    # data collected from the Spotify response.
    audio.export(
        filename,
        format=config.utils.audio_format.value,
        bitrate=f"{config.utils.audio_bitrate}k",
        tags={key: value for key, value in track.items() if key != "duration"},
    )

    return filename

reverse_title_and_artist(path_lookup)

Reverses the title and artist parts of the filename.

Parameters:

Name Type Description Default
path_lookup Dict[str, str]

Mapping of filenames to file paths.

required

Returns:

Type Description
Dict[str, str]

Mapping with the title and artist in the filenames reversed.

Source code in djtools/utils/helpers.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def reverse_title_and_artist(path_lookup: Dict[str, str]) -> Dict[str, str]:
    """Reverses the title and artist parts of the filename.

    Args:
        path_lookup: Mapping of filenames to file paths.

    Returns:
        Mapping with the title and artist in the filenames reversed.
    """
    new_path_lookup = {}
    for key, value in path_lookup.items():
        split = key.split(" - ")
        title = " - ".join(split[:-1])
        artist = split[-1]
        new_path_lookup[f"{artist} - {title}"] = value

    return new_path_lookup

trim_initial_silence(audio, track_durations, trim_amount, silence_thresh=-50, min_silence_ms=5, step_size=100)

Heuristic for determining the amount of leading silence to trim.

Parameters:

Name Type Description Default
audio AudioSegment

Audio with leading silence.

required
track_durations List[int]

List of track durations.

required
trim_amount Union[int, TrimInitialSilenceMode]

Number of milliseconds to trim off the beginning.

required
silence_thresh Optional[float]

Maximum decibel level that's still considered silence.

-50
min_silence_ms Optional[int]

Surrounding milliseconds of each track to check for silence.

5
step_size Optional[int]

Initial step size when checking for leading silences.

100

Returns:

Name Type Description
AudioSegment AudioSegment

Audio with the beginning silence trimmed off.

Source code in djtools/utils/helpers.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def trim_initial_silence(
    audio: AudioSegment,
    track_durations: List[int],
    trim_amount: Union[int, TrimInitialSilenceMode],
    silence_thresh: Optional[float] = -50,
    min_silence_ms: Optional[int] = 5,
    step_size: Optional[int] = 100,
) -> AudioSegment:
    """Heuristic for determining the amount of leading silence to trim.

    Args:
        audio: Audio with leading silence.
        track_durations: List of track durations.
        trim_amount: Number of milliseconds to trim off the beginning.
        silence_thresh: Maximum decibel level that's still considered silence.
        min_silence_ms: Surrounding milliseconds of each track to check for
            silence.
        step_size: Initial step size when checking for leading silences.

    Returns:
        AudioSegment: Audio with the beginning silence trimmed off.
    """
    # If trim_amount is an integer, then it's the number of milliseconds to
    # trim off the beginning of the recording. If a negative integer is
    # provided, then insert that many milliseconds of silence at the beginning
    # of the recording.
    if isinstance(trim_amount, int):
        if trim_amount >= 0:
            return audio[trim_amount:]
        return AudioSegment.silent(duration=abs(trim_amount)) + audio

    # Get the number of milliseconds of silence at the beginning of the
    # recording.
    leading_silence = silence.detect_leading_silence(
        audio, silence_threshold=silence_thresh, chunk_size=1
    )

    # If trim_amount is "auto", simply trim off the detected leading silence.
    if trim_amount == TrimInitialSilenceMode.AUTO:
        return audio[leading_silence:]

    # Use the track durations to infer the points in the recording where each
    # track should begin.
    start_points = []
    index = 0
    for track_duration in track_durations:
        index += track_duration
        start_points.append(index)

    # With a logarithmically decreasing step size, step through the potential
    # offsets to trim off the beginning of the recording.
    step_size = max(step_size, 1)
    offsets = [0, leading_silence]
    while step_size >= 1:
        scores = []
        for offset in range(*offsets, step_size):
            score = 0
            # For each track in the recording, build up a score based on the
            # number of surrounding milliseconds of silence.
            for point in start_points:
                for millisecond in range(1, min_silence_ms + 1):
                    if (
                        audio[offset + point + millisecond].dBFS
                        <= silence_thresh
                    ):
                        score += 1
                    if (
                        audio[offset + point - millisecond].dBFS
                        <= silence_thresh
                    ):
                        score += 1
            scores.append((score, offset))

        # Sort scores in decreasing order.
        scores = sorted(scores, key=itemgetter(0), reverse=True)
        # Get the offsets for the highest two scores.
        _, offsets = zip(*sorted(scores[:2], key=itemgetter(1)))
        step_size //= 2

    # Trim off the start of the recording using the best offset.
    best_offset = max(scores, key=itemgetter(0))[1]
    audio = audio[best_offset:]

    return audio