flexmeasures.data.services.scheduling

Logic around scheduling (jobs)

Functions

flexmeasures.data.services.scheduling.create_scheduling_job(asset_or_sensor: Asset | Sensor | None = None, sensor: Sensor | None = None, job_id: str | None = None, enqueue: bool = True, requeue: bool = False, force_new_job_creation: bool = False, scheduler_specs: dict | None = None, **scheduler_kwargs) Job

Create a new Job, which is queued for later execution.

To support quick retrieval of the scheduling job, the job id is the unique entity address of the UDI event. That means one event leads to one job (i.e. actions are event driven).

As a rule of thumb, keep arguments to the job simple, and deserializable.

The life cycle of a scheduling job: 1. A scheduling job is born here (in create_scheduling_job). 2. It is run in make_schedule which writes results to the db. 3. If an error occurs (and the worker is configured accordingly), handle_scheduling_exception comes in.

Arguments: :param asset_or_sensor: asset or sensor for which the schedule is computed :param job_id: optionally, set a job id explicitly :param enqueue: if True, enqueues the job in case it is new :param requeue: if True, requeues the job in case it is not new and had previously failed

(this argument is used by the @job_cache decorator)

Parameters:

force_new_job_creation – if True, this attribute forces a new job to be created (skipping cache) (this argument is used by the @job_cache decorator)

Returns:

the job

flexmeasures.data.services.scheduling.find_scheduler_class(asset_or_sensor: Asset | Sensor) type

Find out which scheduler to use, given an asset or sensor. This will morph into a logic store utility, and schedulers should be registered for asset types there, instead of this fixed lookup logic.

flexmeasures.data.services.scheduling.get_data_source_for_job(job: Job) DataSource | None

Try to find the data source linked by this scheduling job.

We expect that enough info on the source was placed in the meta dict, either: - the DataSource ID itself (i.e. the normal situation), or - enough info to facilitate a DataSource query (as a fallback).

flexmeasures.data.services.scheduling.handle_scheduling_exception(job, exc_type, exc_value, traceback)

Store exception as job meta data.

flexmeasures.data.services.scheduling.load_custom_scheduler(scheduler_specs: dict) type

Read in custom scheduling spec. Attempt to load the Scheduler class to use.

The scheduler class should be derived from flexmeasures.data.models.planning.Scheduler. The scheduler class should have a class method named “compute”.

Example specs:

{

“module”: “/path/to/module.py”, # or sthg importable, e.g. “package.module” “class”: “NameOfSchedulerClass”,

}

flexmeasures.data.services.scheduling.make_schedule(sensor_id: int | None = None, start: datetime | None = None, end: datetime | None = None, resolution: timedelta | None = None, asset_or_sensor: dict | None = None, belief_time: datetime | None = None, flex_model: dict | None = None, flex_context: dict | None = None, flex_config_has_been_deserialized: bool = False, scheduler_specs: dict | None = None) bool

This function computes a schedule. It returns True if it ran successfully.

It can be queued as a job (see create_scheduling_job). In that case, it will probably run on a different FlexMeasures node than where the job is created. In any case, this function expects flex_model and flex_context to not have been deserialized yet.

This is what this function does: - Find out which scheduler should be used & compute the schedule - Turn scheduled values into beliefs and save them to db

flexmeasures.data.services.scheduling.trigger_optional_fallback(job, connection, type, value, traceback)

Create a fallback schedule job when the error is of type InfeasibleProblemException