Source code for commands.check_non_onboarded_devices
"""Management command to check for not onboarded devices."""from__future__importannotationsfromtypingimportAny,castfromdevices.modelsimportDeviceModel,OnboardingStatusfromdjango.core.management.baseimportBaseCommandfromdjango.utilsimporttimezonefromnotifications.modelsimportNotificationModel,NotificationStatusnew_status,created=NotificationStatus.objects.get_or_create(status='NEW')
[docs]classCommand(BaseCommand):"""Management command to check for devices not onboarded. This command identifies devices that have not been onboarded (status: `NO_ONBOARDING`) and generates informational notifications for each device. If a notification for a specific device already exists, it will be skipped. """
[docs]defhandle(self,*args:Any,**kwargs:dict[str,Any])->None:# noqa: ARG002"""Entrypoint for the command. Args: *args: Additional positional arguments. **kwargs: Additional keyword arguments. """self._check_non_onboarded_devices()self.stdout.write(self.style.SUCCESS('Non-onboarded devices check completed.'))
[docs]def_check_non_onboarded_devices(self)->None:"""Task to create an info notification if a device is not onboarded."""non_onboarded_devices=DeviceModel.objects.filter(onboarding_config__onboarding_status=OnboardingStatus.PENDING)fordeviceinnon_onboarded_devices:ifnotNotificationModel.objects.filter(event='DEVICE_NOT_ONBOARDED',device=device).exists():device_name=cast(DeviceModel,device).common_nameunique_name=cast(DeviceModel,device).domain.unique_namemessage_data={'device':device_name,'domain':unique_name}notification=NotificationModel.objects.create(device=device,created_at=timezone.now(),notification_source=NotificationModel.NotificationSource.DEVICE,notification_type=NotificationModel.NotificationTypes.INFO,message_type=NotificationModel.NotificationMessageType.DEVICE_NOT_ONBOARDED,event='DEVICE_NOT_ONBOARDED',message_data=message_data,)notification.statuses.add(new_status)