(Python 计划)根据 csv 文件更改计划事件的运行时间



注意:我是SO的新手,所以对任何错误表示歉意。我以前用Python做过几个项目。

该计划

从本质上讲,该程序所做的是根据存储在.csv文件中的预定义时间表通知用户某些特定时间。时间每天都在变化(这提出了我稍后讨论的问题(。csv 文件如下所示:

日期,星期,列

1,日出,列3,列4等

3,星期一,值1,07:54,值3,值4

对于此示例,我将在整个程序演练中使用 Sunrise。

我有一个名为get_today_times((的函数,它负责读取 csv 并返回一个名为 time 的列表:

def get_today_times():
#code goes here
return times

然后还有另一个函数从上面的函数中获取时间:

def get_sunrise():
times = get_today_times():
sunrise_time = times[3]
final_string = "Sunrise is at" + sunrise_time
return sunrise_time, final_string

然后将上述函数调用到设置函数中:

def setup():
sunrise_time, sunrise_string = get_sunrise()
schedule_list = [sunrise_time, other variables following the same flow]
output_list = [sunrise_string, other variables following the same flow] 
return schedule_list, output_list

然后使用网络钩子将数据发送到 discord

def sunrise_discord():
notify_discord(final_string)
#The notify_discord function sends it to a discord server using a webhook

以上所有内容均使用附表(https://pypi.org/project/schedule/( 进行如下:

if __name__ == "__main__":
schedule_list, output_list = setup()
schedule.every().day.at(schedule_list[0]).do(sunrise_discord)
while True:
schedule.run_pending()
time.sleep(1)

问题所在

根据我的理解,代码从if __name__的开始运行,直到不断运行的while循环。因此,这意味着发送不和谐消息的时间每天保持不变,并产生如下消息:

(发送于07:54(日出在08:13

调度也不允许调度函数返回如下值:

#DOES NOT WORK
schedule_list, output_list = schedule.every().day.at("00:00").do(setup)

我试过这个:

Python 从计划事件返回值

但它对我不起作用:/我认为上述链接与我的方案之间的区别在于我需要直接更改计划运行事件的时间。

我尝试添加以下代码段,但仍然不起作用:

while True:
schedule.run_pending()
time.sleep(1)
if time.localtime().tm_hour == 00:
if time.localtime().tm_min == 00:
schedule_list, output_list = setup()

上面的代码应该在午夜重新运行setup((函数,然后重新分配schedule_list,因此重新分配它发送不和谐消息的时间,但这也不起作用:(

我考虑过将整个 python 程序安排在每天的特定时间运行。一旦所有预定的事件都发生,终止进程/结束程序,然后在第二天重新启动程序。我还没有真正对此做过任何研究,它只是在我的脑海中。我认为鉴于该程序在第一天正常工作,它会起作用。

任何帮助/想法或其他意见将不胜感激:)

谢谢

请注意:

我的代码可能不是最好的,但每天发送 21 条消息,我发现这种编码方式对我来说是最容易使用的。它仍处于第 1 阶段 Alpha 阶段,很可能会在完成之前进行更改。

问题是如何重新定义计划时间,其余代码仅用于上下文。

可以扩展调度作业和调度程序类。

这是如何在日出/日落时重新安排任务的考试

import datetime
from astral import LocationInfo
from astral.sun import sun
from schedule import Scheduler as _Scheduler
from schedule import Job as _Job

def job(message="Task"):
print("I'm working on:", message)

class Job(_Job):
city = LocationInfo("London", "England", "Europe/London", 51.5, -0.116)
last_sunrise = None
last_sunset = None
def __init__(self, interval, scheduler):
super().__init__(interval, scheduler)
def at_sunrise(self):
s = sun(self.city.observer, date=self.next_run)
self.last_sunrise = s["sunrise"]
return self.at(self.last_sunrise.strftime("%H:%M:%S"))
def at_sunset(self):
s = sun(self.city.observer, date=self.next_run)
self.last_sunset = s["sunset"]
return self.at(self.last_sunset.strftime("%H:%M:%S"))
def rescedule_astro(self):
if self.last_sunset:
self.at_sunset()
if self.last_sunrise:
self.at_sunrise()
def _schedule_next_run(self):
super()._schedule_next_run()
self.rescedule_astro()


class Scheduler(_Scheduler):
def __init__(self):
super().__init__()
def every(self, interval=1):
""" 
Schedule a new periodic job.
:param interval: A quantity of a certain time unit
:return: An unconfigured :class:`Job <Job>`
"""
job = Job(interval, self)
return job

schedule = Scheduler()
schedule.every().friday.at_sunrise().do(job, message="Friday Sunrise")
schedule.every().day.at_sunset().do(job, message="Everyday Sunset")
print(schedule.jobs)
print(schedule.run_all())
print(schedule.jobs)

最新更新