AttributeError: 'module'对象没有属性'_strptime' (threading time.strptime using threading.Python2.7 中的线



我想知道是否有人遇到过这个问题并有解决方案。

Traceback主要错误:AttributeError:"module"对象没有属性"_strptime">

在Python 2.7的线程中运行time.strptime时与此问题类似:https://mail.python.org/pipermail/python-list/2015-October/697689.html

from datetime import datetime
import functools
import logging
import time
from threading import currentThread, Thread
import sys
log = logging.getLogger(__name__)

def _worker_process_queue(worker_fnc, input_queue, output_queue):
"""Function to get item from input queue and call `worker_fnc` on it and store the result of it in output queue
Args:
worker_fnc (function): task function
input_queue (list): List that stores all inputs that needs to be passed to worker_fnc
output_queue (list): List that stores all results/output from worker_fnc
NOTE:
Using List and not Queue because the operations being performed on the shared List are all atomic
(.pop, .append)
http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
https://stackoverflow.com/questions/6319207/are-lists-thread-safe
"""
err = None
task_output = None
_task = input_queue.pop()           # NOTE: This list operation is thread-safe
task_input = _task['task_input']
task_index = _task['task_index']
try:
log.debug('Thread: {} is running: {} with kwargs: {}'.format(currentThread(), worker_fnc, task_input))
task_output = worker_fnc(**task_input)
except Exception as e:
err = e
log.exception("run_tasks_concurrently._worker unhandled Exception: {}".format(e))
finally:
# NOTE: This list operation is thread-safe
output_queue.append({'task_output': task_output, 'task_error': err, 'task_index': task_index})

def run_tasks_concurrently(pool_size, worker_fnc, tasks):
"""Run I/O Tasks (e.g. GET requests calls) concurrently using Threads
This function blocks until all tasks are completed
Args:
pool_size (int): Total amount of threads to spawn at a given time
worker_fnc (function): Worker function that will get called at each thread.
NOTE: Make sure that `worker_fnc` is thread safe
https://en.wikipedia.org/wiki/Thread_safety
tasks (list): List of dict, where keys and values of the dict are same as kwargs
of `worker_fnc`
Returns:
list: List of items, where each item is the return of `worker_fnc`. Order of this list is the same as `tasks`
Logic:
1. Create a input queue (list to store all inputs that needs to be passed to worker_fnc)
2. Populate the input queue with items from `tasks`
2. Create a output queue (list to store all outputs after running worker_fnc)
2. For each task start a thread (that will run in background), total of `pool_size` threads will run at a time
3. Within each thread:
a. Get item from input queue and call `worker_fnc` on it and store the result of it in output queue
b. Handle any exception and store it in the output queue
4. After all threads have run, get all results from output queue and order then in order of input
5. Throw RunTime exception if any unhandled exception is found in one of the task
Example:
>>> import requests
>>> def worker(ip):
>>>     return {'ip': ip, 'code': requests.get('http://{}'.format(ip)).status_code}
>>> pool_size = 5
>>> tasks = [{'ip': "172.217.0.{}".format(i)} for i in range(pool_size)]
>>> print(run_tasks_concurrently(pool_size, worker, tasks))
[{'ip': '172.217.0.0', 'code': 200}, {'ip': '172.217.0.1', 'code': 200}, ...]
"""
# Populate the tasks in the input_queue
output_queue = []
input_queue = []
size_of_tasks = len(tasks)
for i, task in reversed(list(enumerate(tasks))):          # Reversing so that worker can pop and use last entry
input_queue.append({'task_input': task, 'task_index': i})
start_time = datetime.now()
log.debug("Starting run_tasks_concurrently {} tasks with pool_size {}".format(size_of_tasks, pool_size))
# For each task start a thread (that will run in background), total of `pool_size` threads will run at a time
threads = []
all_chunked_tasks = chunks(tasks, pool_size)
for i, chunked_tasks in enumerate(all_chunked_tasks):
for chunked_tasks in chunked_tasks:
t = Thread(target=_worker_process_queue, args=(worker_fnc, input_queue, output_queue))
t.daemon = True
t.start()
threads.append(t)
# Block the code until chunked tasks threads complete
log.debug('Waiting for tasks {}->{} to finish out of total {} tasks'
.format(pool_size * i, pool_size * (i + 1), size_of_tasks))
[t.join() for t in threads]
# After all threads have run, get all results from output queue and order then in order of input
results = [None] * size_of_tasks
unhandled_exceptions = []
for _result in output_queue:
task_error = _result.get('task_error')
task_index = _result['task_index']
# Order the result in the order of tasks
results[task_index] = _result['task_output']
if task_error:
unhandled_exceptions.append((task_error, task_index))
# Throw RunTime exception if any unhandled exception is found in one of the task
if unhandled_exceptions:
error_msgs = ["For give task input {} unhandled exception {}".format(tasks[task_index], task_error)
for task_error, task_index in unhandled_exceptions]
error_msgs = "n".join(error_msgs)
raise RuntimeError("Unhandled exceptions caught in threads:n{}".format(error_msgs))
# Get the result, close all threads and return results
end_time = datetime.now() - start_time
log.debug("runtime for run_tasks_concurrently: {} result length: {}".format(end_time, len(tasks)))
return results

def chunks(l, n):
"""Yield successive n-sized chunks from l.
Args:
l (list): list of items that needs to be evenly chunked
n (int): chunk size
Returns:
list: list of list
Example:
>>> chunks(range(5), 2)
[[0,1], [2,3], [4,]]
>>>
"""
for i in range(0, len(l), n):
yield l[i:i + n]
def a_worker_fnc(x):
time.strptime('Tue, 31 Jul 2018 17:15:24 GMT', '%a, %d %b %Y %X GMT')
return x*x

def main():
print(sys.version)
t1 = datetime.now()
logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)s %(name)s %(process)d/%(thread)d: %(message)s")
no_of_jobs,no_of_workers = 2,2
log.info('Starting script for # of jobs: {} running # of workers at a time {}...'.format(no_of_jobs, no_of_workers))
tasks = [{"x": x}for x in range(no_of_jobs)]
print(run_tasks_concurrently(no_of_workers, a_worker_fnc, tasks))
print(datetime.now() - t1)
if __name__ == '__main__':
main()

追溯

2.7.8 |Continuum Analytics, Inc.| (default, Aug 21 2014, 18:22:21) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
2018-07-31 17:18:07,888 INFO __main__ 21304/139920311363328: Starting script for # of jobs: 2 running # of workers at a time 2...
2018-07-31 17:18:07,888 DEBUG __main__ 21304/139920311363328: Starting run_tasks_concurrently 2 tasks with pool_size 2
2018-07-31 17:18:07,889 DEBUG __main__ 21304/139920173688576: Thread: <Thread(Thread-1, started daemon 139920173688576)> is running: <function a_worker_fnc at 0x7f41b440f140> with kwargs: {'x': 0}
2018-07-31 17:18:07,890 DEBUG __main__ 21304/139920161081088: Thread: <Thread(Thread-2, started daemon 139920161081088)> is running: <function a_worker_fnc at 0x7f41b440f140> with kwargs: {'x': 1}
2018-07-31 17:18:07,890 DEBUG __main__ 21304/139920311363328: Waiting for tasks 0->2 to finish out of total 2 tasks
2018-07-31 17:18:07,890 ERROR __main__ 21304/139920161081088: run_tasks_concurrently._worker unhandled Exception: 'module' object has no attribute '_strptime_time'
Traceback (most recent call last):
File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 34, in _worker_process_queue
task_output = worker_fnc(**task_input)
File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 152, in a_worker_fnc
time.strptime('Tue, 31 Jul 2018 17:15:24 GMT', '%a, %d %b %Y %X GMT')
AttributeError: 'module' object has no attribute '_strptime_time'
Traceback (most recent call last):
File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 168, in <module>
main()
File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 164, in main
print(run_tasks_concurrently(no_of_workers, a_worker_fnc, tasks))
File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 123, in run_tasks_concurrently
raise RuntimeError("Unhandled exceptions caught in threads:n{}".format(error_msgs))
RuntimeError: Unhandled exceptions caught in threads:
For give task input {'x': 1} unhandled exception 'module' object has no attribute '_strptime_time'

Python3.4通过时也是如此

(pyenv34) barikak@dev-dsk:~% python /home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py
3.4.0 |Continuum Analytics, Inc.| (default, Mar 17 2014, 16:13:14)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-54)]
2018-07-31 17:22:41,863 INFO __main__ 22009/140545970165504: Starting script for # of jobs: 2 running # of workers at a time 2...
2018-07-31 17:22:41,863 DEBUG __main__ 22009/140545970165504: Starting run_tasks_concurrently 2 tasks with pool_size 2
2018-07-31 17:22:41,863 DEBUG __main__ 22009/140545839482624: Thread: <Thread(Thread-1, started daemon 140545839482624)> is running: <function a_worker_fnc at 0x7fd360d8c6a8> with kwargs: {'x': 0}
2018-07-31 17:22:41,864 DEBUG __main__ 22009/140545758328576: Thread: <Thread(Thread-2, started daemon 140545758328576)> is running: <function a_worker_fnc at 0x7fd360d8c6a8> with kwargs: {'x': 1}
2018-07-31 17:22:41,864 DEBUG __main__ 22009/140545970165504: Waiting for tasks 0->2 to finish out of total 2 tasks
2018-07-31 17:22:41,869 DEBUG __main__ 22009/140545970165504: runtime for run_tasks_concurrently: 0:00:00.005774 result length: 2
[0, 1]
0:00:00.006079
(pyenv34) barikak@dev-dsk:~%

解决方案是做什么https://stackoverflow.com/a/22476843/558397已经描述了

import _strptime
from datetime import datetime
# then, in threaded block
datetime.strptime(date, format)

最新更新