Skip to content

Sync

This module contains the configuration object for the sync package. The attributes of this configuration object correspond with the "sync" key of config.yaml

SyncConfig

Bases: BaseConfigFormatter

Configuration object for the sync package.

Source code in djtools/sync/config.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class SyncConfig(BaseConfigFormatter):
    """Configuration object for the sync package."""

    artist_first: bool = False
    aws_profile: str = "default"
    aws_use_date_modified: bool = False
    bucket_url: str = ""
    discord_url: str = ""
    download_collection: bool = False
    download_exclude_dirs: List[Path] = []
    download_include_dirs: List[Path] = []
    download_music: bool = False
    download_spotify_playlist: str = ""
    dryrun: bool = False
    import_user: str = ""
    upload_collection: bool = False
    upload_exclude_dirs: List[Path] = []
    upload_include_dirs: List[Path] = []
    upload_music: bool = False
    usb_path: Optional[Path] = None
    user: str = ""

    def __init__(self, *args, **kwargs):
        """Constructor.

        Raises:
            ValueError: Both include and exclude dirs can't be provided at the
                same time.
            RuntimeError: aws_profile must be set.
        """
        super().__init__(*args, **kwargs)
        if not self.user:
            self.user = getpass.getuser()

        if (self.upload_include_dirs and self.upload_exclude_dirs) or (
            self.download_include_dirs and self.download_exclude_dirs
        ):
            msg = (
                "Config must neither contain both upload_include_dirs and "
                "upload_exclude_dirs or both download_include_dirs and "
                "download_exclude_dirs"
            )
            logger.critical(msg)
            raise ValueError(msg)

        if any(
            [
                self.download_collection,
                self.download_music,
                self.upload_collection,
                self.upload_music,
            ]
        ):
            if not self.aws_profile:
                msg = "Config must include aws_profile for sync operations"
                logger.critical(msg)
                raise RuntimeError(msg)

            if not self.bucket_url:
                msg = "Config must include bucket_url for sync operations"
                logger.critical(msg)
                raise RuntimeError(msg)

        os.environ["AWS_PROFILE"] = (
            self.aws_profile
        )  # pylint: disable=no-member

        if any([self.download_music, self.upload_music]) and not self.usb_path:
            msg = (
                "Config must include usb_path for both download_music and "
                "upload_music sync operations"
            )
            logger.critical(msg)
            raise RuntimeError(msg)

        if (
            any([self.download_music, self.upload_music])
            and not self.usb_path.exists()
        ):
            msg = f'Configured usb_path "{self.usb_path}" was not found!'
            logger.critical(msg)
            raise RuntimeError(msg)

        if self.upload_music and not self.discord_url:
            logger.warning(
                'discord_url is not configured...set this for "New Music" '
                "discord messages!"
            )

        if self.download_collection and not self.import_user:
            raise RuntimeError(
                "import_user must be set to download a collection"
            )

__init__(*args, **kwargs)

Constructor.

Raises:

Type Description
ValueError

Both include and exclude dirs can't be provided at the same time.

RuntimeError

aws_profile must be set.

Source code in djtools/sync/config.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(self, *args, **kwargs):
    """Constructor.

    Raises:
        ValueError: Both include and exclude dirs can't be provided at the
            same time.
        RuntimeError: aws_profile must be set.
    """
    super().__init__(*args, **kwargs)
    if not self.user:
        self.user = getpass.getuser()

    if (self.upload_include_dirs and self.upload_exclude_dirs) or (
        self.download_include_dirs and self.download_exclude_dirs
    ):
        msg = (
            "Config must neither contain both upload_include_dirs and "
            "upload_exclude_dirs or both download_include_dirs and "
            "download_exclude_dirs"
        )
        logger.critical(msg)
        raise ValueError(msg)

    if any(
        [
            self.download_collection,
            self.download_music,
            self.upload_collection,
            self.upload_music,
        ]
    ):
        if not self.aws_profile:
            msg = "Config must include aws_profile for sync operations"
            logger.critical(msg)
            raise RuntimeError(msg)

        if not self.bucket_url:
            msg = "Config must include bucket_url for sync operations"
            logger.critical(msg)
            raise RuntimeError(msg)

    os.environ["AWS_PROFILE"] = (
        self.aws_profile
    )  # pylint: disable=no-member

    if any([self.download_music, self.upload_music]) and not self.usb_path:
        msg = (
            "Config must include usb_path for both download_music and "
            "upload_music sync operations"
        )
        logger.critical(msg)
        raise RuntimeError(msg)

    if (
        any([self.download_music, self.upload_music])
        and not self.usb_path.exists()
    ):
        msg = f'Configured usb_path "{self.usb_path}" was not found!'
        logger.critical(msg)
        raise RuntimeError(msg)

    if self.upload_music and not self.discord_url:
        logger.warning(
            'discord_url is not configured...set this for "New Music" '
            "discord messages!"
        )

    if self.download_collection and not self.import_user:
        raise RuntimeError(
            "import_user must be set to download a collection"
        )

This module is responsible for syncing tracks between "usb_path" and the Beatcloud (upload and download). It also handles uploading the collection located at "collection_path" and downloading the collection uploaded to the Beatcloud by "import_user" before modifying it to point to track locations at "usb_path".

download_collection(config)

This function downloads the collection of "import_user".

After downloading "import_user"'s collection, the location of all the tracks are modified so that they point to user's "usb_path".

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
Source code in djtools/sync/sync_operations.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def download_collection(config: BaseConfig):
    """This function downloads the collection of "import_user".

    After downloading "import_user"'s collection, the location of all the
    tracks are modified so that they point to user's "usb_path".

    Args:
        config: Configuration object.
    """
    logger.info(
        f"Downloading {config.sync.import_user}'s {config.collection.platform.value} collection..."
    )
    collection_dir = config.collection.collection_path.parent
    src = (
        f"{config.sync.bucket_url}/dj/collections/{config.sync.import_user}/"
        f"{config.collection.platform.value}_collection"
    )
    dst = (
        Path(collection_dir)
        / f"{config.sync.import_user}_{config.collection.collection_path.name}"
    )
    cmd = ["aws", "s3", "cp", src, dst.as_posix()]
    if config.collection.collection_path.is_dir():
        cmd.append("--recursive")
    logger.info(" ".join(cmd))
    with Popen(cmd) as proc:
        proc.wait()
    if config.sync.user != config.sync.import_user:
        rewrite_track_paths(config, dst)

download_music(config, beatcloud_tracks=None)

This function syncs tracks from the Beatcloud to "usb_path".

If "download_spotify_playlist" is set to a playlist name that exists in "spotify_playlists.yaml", then "download_include_dirs" will be populated with tracks in that playlist that match Beatcloud tracks.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
beatcloud_tracks Optional[List[str]]

List of track artist - titles from S3.

None
Source code in djtools/sync/sync_operations.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def download_music(
    config: BaseConfig, beatcloud_tracks: Optional[List[str]] = None
):
    """This function syncs tracks from the Beatcloud to "usb_path".

    If "download_spotify_playlist" is set to a playlist name that exists in
    "spotify_playlists.yaml", then "download_include_dirs" will be populated
    with tracks in that playlist that match Beatcloud tracks.

    Args:
        config: Configuration object.
        beatcloud_tracks: List of track artist - titles from S3.
    """
    if config.sync.download_spotify_playlist:
        playlist_name = config.sync.download_spotify_playlist
        user = playlist_name.split("Uploads")[0].strip()
        beatcloud_tracks, beatcloud_matches = compare_tracks(
            config,
            beatcloud_tracks=beatcloud_tracks,
        )
        if not beatcloud_matches:
            logger.warning(
                "No Beatcloud matches were found! Make sure you've supplied "
                "the correct playlist name."
            )
            return beatcloud_tracks
        config.sync.download_include_dirs = [
            (Path(user) / path.as_posix().split(f"{Path(user)}/")[-1])
            for path in beatcloud_matches
        ]
        config.sync.download_exclude_dirs = []

    logger.info("Downloading track collection...")
    dest = Path(config.sync.usb_path) / "DJ Music"
    glob_path = (Path("**") / "*.*").as_posix()
    old = {str(p) for p in dest.rglob(glob_path)}
    logger.info(f"Found {len(old)} files at {config.sync.usb_path}")

    dest.mkdir(parents=True, exist_ok=True)
    cmd = [
        "aws",
        "s3",
        "sync",
        f"{config.sync.bucket_url}/dj/music/",
        dest.as_posix(),
    ]
    run_sync(parse_sync_command(cmd, config), config.sync.bucket_url)

    new = {str(p) for p in dest.rglob(glob_path)}
    difference = sorted(list(new.difference(old)), key=getmtime)
    if difference:
        logger.info(f"Found {len(difference)} new files")
        for diff in difference:
            logger.info(f"\t{diff}")

    return beatcloud_tracks

upload_collection(config)

This function uploads "collection_path" to the cloud.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
Source code in djtools/sync/sync_operations.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def upload_collection(config: BaseConfig):
    """This function uploads "collection_path" to the cloud.

    Args:
        config: Configuration object.
    """
    logger.info(
        f"Uploading {config.sync.user}'s {config.collection.platform.value} collection..."
    )
    dst = (
        f"{config.sync.bucket_url}/dj/collections/{config.sync.user}/"
        f"{config.collection.platform.value}_collection"
    )
    cmd = [
        "aws",
        "s3",
        "cp",
        config.collection.collection_path.as_posix(),
        dst,
    ]
    if config.collection.collection_path.is_dir():
        cmd.append("--recursive")
    logger.info(" ".join(cmd))
    with Popen(cmd) as proc:
        proc.wait()

upload_music(config)

This function syncs tracks from "usb_path" to the Beatcloud.

"aws_use_date_modified" can be used in order to re-upload tracks that already exist in the Beatcloud but have been modified since the last time they were uploaded (i.e. ID3 tags have been altered).

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
Source code in djtools/sync/sync_operations.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def upload_music(config: BaseConfig):
    """This function syncs tracks from "usb_path" to the Beatcloud.

    "aws_use_date_modified" can be used in order to re-upload tracks that
    already exist in the Beatcloud but have been modified since the last time
    they were uploaded (i.e. ID3 tags have been altered).

    Args:
        config: Configuration object.
    """
    hidden_files = set(
        (Path(config.sync.usb_path) / "DJ Music").rglob(
            (Path("**") / ".*.*").as_posix()
        )
    )
    if hidden_files:
        logger.info(f"Removed {len(hidden_files)} files...")
        for _file in hidden_files:
            logger.info(f"\t{_file}")
            _file.unlink()

    logger.info("Uploading track collection...")
    src = (Path(config.sync.usb_path) / "DJ Music").as_posix()
    cmd = ["aws", "s3", "sync", src, f"{config.sync.bucket_url}/dj/music/"]

    if config.sync.discord_url and not config.sync.dryrun:
        webhook(
            config.sync.discord_url,
            content=run_sync(
                parse_sync_command(cmd, config, upload=True),
                config.sync.bucket_url,
            ),
        )
    else:
        run_sync(
            parse_sync_command(cmd, config, upload=True),
            config.sync.bucket_url,
        )

This module contains helper functions used by the "sync_operations" module. Helper functions include formatting "aws s3 sync" commands, formatting the output of "aws s3 sync" commands, posting uploaded tracks to Discord, and modifying import_user's collection to point to tracks located at "usb_path".

parse_sync_command(_cmd, config, upload=False)

Appends flags to "aws s3 sync" command. If "_include_dirs" is specified, all directories are ignored except those specified. If "_exclude_dirs" is specified, all directories are included except those specified. Only one of these can be specified at once. If "aws_use_date_modified", then tracks will be re-downloaded / re-uploaded if their date modified at the source is after that of the destination.

Parameters:

Name Type Description Default
_cmd str

Partial "aws s3 sync" command.

required
config BaseConfig

Configuration object.

required
upload Optional[bool]

Whether uploading or downloading.

False

Returns:

Type Description
str

Fully constructed "aws s3 sync" command.

Source code in djtools/sync/helpers.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def parse_sync_command(
    _cmd: str,
    config: BaseConfig,
    upload: Optional[bool] = False,
) -> str:
    """Appends flags to "aws s3 sync" command. If "*_include_dirs" is
        specified, all directories are ignored except those specified. If
        "*_exclude_dirs" is specified, all directories are included except
        those specified. Only one of these can be specified at once. If
        "aws_use_date_modified", then tracks will be
        re-downloaded / re-uploaded if their date modified at the source is
        after that of the destination.

    Args:
        _cmd: Partial "aws s3 sync" command.
        config: Configuration object.
        upload: Whether uploading or downloading.

    Returns:
        Fully constructed "aws s3 sync" command.
    """
    if (upload and config.sync.upload_include_dirs) or (
        not upload and config.sync.download_include_dirs
    ):
        _cmd.extend(["--exclude", "*"])
        directories = map(
            Path,
            getattr(
                config.sync, f'{"up" if upload else "down"}load_include_dirs'
            ),
        )
        for _dir in directories:
            path = Path(_dir.stem)
            ext = _dir.suffix
            if not ext:
                path = _dir / "*"
            else:
                path = _dir.parent / path.with_suffix(ext)
            _cmd.extend(["--include", path.as_posix()])
    if (upload and config.sync.upload_exclude_dirs) or (
        not upload and config.sync.download_exclude_dirs
    ):
        _cmd.extend(["--include", "*"])
        directories = map(
            Path,
            getattr(
                config.sync, f'{"up" if upload else "down"}load_exclude_dirs'
            ),
        )
        for _dir in directories:
            path = Path(_dir.stem)
            ext = _dir.suffix
            if not ext:
                path = _dir / "*"
            else:
                path = _dir.parent / path.with_suffix(ext)
            _cmd.extend(["--exclude", path.as_posix()])
    if not config.sync.aws_use_date_modified:
        _cmd.append("--size-only")
    if config.sync.dryrun:
        _cmd.append("--dryrun")
    logger.info(" ".join(_cmd))

    return _cmd

rewrite_track_paths(config, other_user_collection)

This function modifies the location of tracks in a collection.

This is done by replacing the "usb_path" written by "import_user" with the "usb_path" in "config.yaml".

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
other_user_collection Path

Path to another user's collection.

required
Source code in djtools/sync/helpers.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@make_path
def rewrite_track_paths(config: BaseConfig, other_user_collection: Path):
    """This function modifies the location of tracks in a collection.

    This is done by replacing the "usb_path" written by "import_user" with the
    "usb_path" in "config.yaml".

    Args:
        config: Configuration object.
        other_user_collection: Path to another user's collection.
    """
    music_path = Path("DJ Music")
    collection = PLATFORM_REGISTRY[config.collection.platform]["collection"](
        path=other_user_collection
    )
    for track in collection.get_tracks().values():
        loc = track.get_location().as_posix()
        common_path = (
            music_path / loc.split(str(music_path) + "/", maxsplit=-1)[-1]
        )
        track.set_location(config.sync.usb_path / common_path)
    collection.serialize(path=other_user_collection)

run_sync(_cmd, bucket_url)

Runs subprocess for "aws s3 sync" command. Output is collected and formatted such that uploaded tracks are grouped by their directories.

Parameters:

Name Type Description Default
_cmd str

"aws s3 sync" command.

required
bucket_url str

URL to an AWS S3 API compliant bucket.

required

Raises:

Type Description
CalledProcessError

raised if "aws s3 sync" command fails.

RuntimeError

raised if any other exception occurs while syncing.

Returns:

Type Description
str

Formatted list of uploaded tracks; tracks are grouped by directory.

Source code in djtools/sync/helpers.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def run_sync(_cmd: str, bucket_url: str) -> str:
    """Runs subprocess for "aws s3 sync" command. Output is collected and
        formatted such that uploaded tracks are grouped by their directories.

    Args:
        _cmd: "aws s3 sync" command.
        bucket_url: URL to an AWS S3 API compliant bucket.

    Raises:
        CalledProcessError: raised if "aws s3 sync" command fails.
        RuntimeError: raised if any other exception occurs while syncing.

    Returns:
        Formatted list of uploaded tracks; tracks are grouped by directory.
    """
    line = ""
    termination_chars = {"\n", "\r"}
    tracks = []
    try:
        with Popen(_cmd, stdout=PIPE) as proc:
            while True:
                try:
                    char = proc.stdout.read(1).decode()
                except UnicodeDecodeError:
                    char = ""
                if char == "" and proc.poll() is not None:
                    break
                if char not in termination_chars:
                    line += char
                    continue
                print(line, end=char)
                if char != "\r" and "upload: " in line:
                    line = line.split(f"{bucket_url}/dj/music/")[-1]
                    tracks.append(Path(line))
                line = ""
            proc.stdout.close()
            return_code = proc.wait()
        if return_code:
            raise CalledProcessError(return_code, " ".join(_cmd))
    except Exception as exc:
        msg = f"Failure while syncing: {exc}"
        logger.critical(msg)
        raise RuntimeError(msg) from exc

    new_music = ""
    for group_id, group in groupby(
        sorted(tracks, key=lambda x: x.parent.as_posix()),
        key=lambda x: x.parent.as_posix(),
    ):
        group = sorted(group)
        new_music += f"{group_id}: {len(group)}\n"
        for track in group:
            new_music += f"\t{track.name}\n"
    if new_music:
        logger.info(
            f"Successfully uploaded {len(tracks)} tracks:\n{new_music}"
        )

    return new_music

upload_log(config, log_file)

This function uploads "log_file" to the "user" logs folder in S3. It then deletes all files created more than one day ago.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object.

required
log_file Path

Path to log file.

required
Source code in djtools/sync/helpers.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@make_path
def upload_log(config: BaseConfig, log_file: Path):
    """This function uploads "log_file" to the "user" logs folder in S3. It
        then deletes all files created more than one day ago.

    Args:
        config: Configuration object.
        log_file: Path to log file.
    """
    if not config.sync.aws_profile:
        logger.warning(
            "Logs cannot be backed up without specifying the config option "
            "aws_profile"
        )
        return

    dst = (
        f"{config.sync.bucket_url}/dj/logs/{config.sync.user}/{log_file.name}"
    )
    cmd = ["aws", "s3", "cp", log_file.as_posix(), dst]
    logger.info(" ".join(cmd))
    with Popen(cmd) as proc:
        proc.wait()

    now = datetime.now()
    one_day = timedelta(days=1)
    for _file in log_file.parent.rglob("*"):
        if (
            _file.name != "__init__.py"
            and _file.is_file()
            and _file.lstat().st_mtime < (now - one_day).timestamp()
        ):
            _file.unlink()

webhook(url, content_size_limit=2000, content=None)

Post track list of newly uploaded tracks to Discord channel associated with "url". Track list is split across multiple messages if the character limit exceed "content_size_limit".

Parameters:

Name Type Description Default
url str

Discord URL for webhook.

required
content_size_limit int

Character limit for Discord message; if content is larger, then multiple messages are sent.

2000
content Optional[str]

Uploaded tracks (if any).

None
Source code in djtools/sync/helpers.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def webhook(
    url: str, content_size_limit: int = 2000, content: Optional[str] = None
):
    """Post track list of newly uploaded tracks to Discord channel associated
        with "url". Track list is split across multiple messages if the
        character limit exceed "content_size_limit".

    Args:
        url (str): Discord URL for webhook.
        content_size_limit: Character limit for Discord message; if content is
            larger, then multiple messages are sent.
        content: Uploaded tracks (if any).
    """
    if not content:
        logger.info("There's no content")
        return

    batch = content[:content_size_limit]
    remainder = content[content_size_limit:]
    while batch:
        index = content_size_limit - 1
        while True:
            if index == 0:
                index = content_size_limit
                break
            try:
                if batch[index] == "\n":
                    break
            except IndexError:
                break
            index -= 1
        remainder = batch[index + 1 :] + remainder
        batch = batch[: index + 1]

        if batch:
            requests.post(url, json={"content": batch}, timeout=10)
            batch = remainder[:content_size_limit]
            remainder = remainder[content_size_limit:]