This evening I tried adding a feature to handle Cartography exceptions consistently. I started by adding a new CLI arg:

parser.add_argument(
    '--on-exception',
    type=str,
    default='continue',
    help=(
        'Valid values: "continue" (default) or "stop". Determines what cartography will do when it encounters'
        'an unhandled sync exception. "continue" attempts to skip failed instructions and keep going, and'
        '"stop" makes the sync stop.'
    ),

And then I added it to the config object itself:

class Config:
	...
	self.on_exception = on_exception
	...

The trouble came when I tried to handle it in a sync. I first thought to do it at the sync level: if the AWS sync fails, then there’s no reason why we should make the GCP sync not even start.

But then I thought, we will almost definitely want more fine-grained control on this behavior; if the AWS IAM sync fails, then we should still continue with the AWS EC2 sync. This meant that I would need to pass the global config object into each intelmodule.

The chain looks like this:

def start_aws_ingestion(neo4j_session, config):

calls

_sync_multiple_accounts(neo4j_session, aws_accounts, config.update_tag, common_job_parameters)

which calls

_sync_one_account(neo4j_session, boto3_session, account_id, sync_tag, common_job_parameters)

which then calls

iam.sync(neo4j_session, boto3_session, account_id, sync_tag, common_job_parameters)

As you see, I would need to either alter the function signature of everything in that chain to pass in config.on_exception, or I would need to supply it as a common_job_parameter. I opted for the latter.

And then comes the matter of actually handling the exception. What would this look like?

In iam.sync(), I could do something like

def sync(neo4j_session, boto3_session, account_id, update_tag, common_job_parameters):
    logger.info("Syncing IAM for account '%s'.", account_id)
    on_exception = common_job_parameters['on_exception']
    
    functions = [
        sync_users, sync_groups, sync_roles, sync_group_memberships, sync_user_access_keys
    ]
    for func in functions:
        try:
            func(neo4j_session, boto3_session, account_id, update_tag, common_job_parameters)
        except Exception as e:
            if on_exception == 'stop':
                raise
            else:
                logger.error(e)
                continue

The problem here though is that new module authors would need to copy this (garbage) boilerplate code and this is not a scaleable, general enough solution.

Let’s step back a bit, what am I trying to do here?

I need the sync to be more reliable. If it fails, then I need to know about it, but I don’t want it to give up on the rest of my data because a full sync run takes about 10 hours and I don’t want to wait 12 hours for the next cron job to start, and I also don’t want to be bothered kicking off a manual run.

So, now I’m back to working on DAGs. I don’t care about any parallelism or speed-up factor just yet, I just don’t want avoidable crashes. So yeah, I’m going to play around with https://github.com/lyft/cartography/pull/289 again, and if that doesn’t work out, I think I’ll learn me some Airflow.