本文共 37792 字,大约阅读时间需要 125 分钟。
Python搭建RTMP流媒体服务器,实现网络摄像头的推流,并使用机器视觉模型Yolo-v3实时处理视频、输出分类标签。
对于推流与处理,用到了多线程防止卡死。# -*- coding: utf-8 -*-"""Created on Mon Jul 6 22:21:49 2020@author: Straka"""# =============================================================================# """# 1 开启TomCat# 2 开启NGINX(cmd)# C:# cd C:\Users\Administrator\Desktop\nginx# start nginx# nginx -s reload# 3 开启 C:\ffmpeg-4.3-win64-static\bin\MulHYZY_dealOnly-2.py# """# =============================================================================dealFps = 1/2# import gcimport time,datetimeimport queueimport threading# import cv2 as cvimport subprocess as spimport numpy as np# import sys# sys.setrecursionlimit(2**31-1) # 设置栈的大小# sys.path.append("./")import mxnet as mximport osos.environ["MXNET_CUDNN_AUTOTUNE_DEFAULT"] = "0"os.chdir(r'C:/BabyCam')import gluoncv as gcvgcv.utils.check_version('0.6.0')from gluoncv.model_zoo import get_modelprint("包加载完毕")from gluoncv.utils import try_import_cv2cv = try_import_cv2()nms_thresh = 0.45classes = ['asleep','awake','crying','quilt kicked','side asleep','on stomach', 'face covered',]classes = ['熟睡','清醒','哭闹','踢被','侧睡','趴睡', '捂被',]dealCtx = [mx.gpu(0)] # dealCtx = [mx.cpu(0)]# 模型加载model_dir = r'C:\BabyCam\model/yolov3/yolo3_darknet53_custom_best.params'net_name = '_'.join(('yolo3', 'darknet53', 'custom'))net = get_model(net_name, classes=classes, ctx=dealCtx)net.load_parameters(model_dir, ctx=dealCtx)net.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()net.hybridize()print("模型A加载完毕")netB = get_model(net_name, classes=classes, ctx=dealCtx)netB.load_parameters(model_dir, ctx=dealCtx)netB.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()netB.hybridize()print("模型B加载完毕")netC = get_model(net_name, classes=classes, ctx=dealCtx)netC.load_parameters(model_dir, ctx=dealCtx)netC.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()netC.hybridize()print("模型C加载完毕")netD = get_model(net_name, classes=classes, ctx=dealCtx)netD.load_parameters(model_dir, ctx=dealCtx)netD.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()netC.hybridize()print("模型D加载完毕")# =============================================================================# netE = get_model(net_name, classes=classes, ctx=dealCtx)# netE.load_parameters(model_dir, ctx=dealCtx)# netE.set_nms(nms_thresh=0.45, nms_topk=400)# mx.nd.waitall()# netE.hybridize()# print("模型E加载完毕")# =============================================================================tempframe = mx.nd.array(cv.cvtColor(np.ones((720, 1080, 3),np.uint8)*128, cv.COLOR_BGR2RGB)).astype('uint8')rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(tempframe, short=416, max_size=1024)rgb_nd = rgb_nd.as_in_context(dealCtx[0])class_IDs, scores, bounding_boxes = net(rgb_nd)class_IDs, scores, bounding_boxes = netB(rgb_nd)class_IDs, scores, bounding_boxes = netC(rgb_nd)class_IDs, scores, bounding_boxes = netD(rgb_nd)# class_IDs, scores, bounding_boxes = netE(rgb_nd)person = np.sum(class_IDs==0)hat = np.sum(class_IDs==1)scale = 1.0 * tempframe.shape[0] / scaled_frame.shape[0]img, result = gcv.utils.viz.cv_plot_bbox(tempframe.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)del img,tempframe,rgb_nd,class_IDs,scores,bounding_boxes,person,hat,scale,resultprint("初始化完毕")from tensorflow.keras.preprocessing.image import array_to_imgdef showImg(frame): array_to_img(frame).show()# =============================================================================# import datetime# cap = cv.VideoCapture(r"rtmp://0.0.0.0:1936/live/3")# def read():# now_time = datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S')# ret, frame = cap.read()# # showImg(frame)# print(now_time)# tempframe = mx.nd.array(cv.cvtColor(frame, cv.COLOR_BGR2RGB)).astype('uint8')# rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(tempframe, short=416, max_size=1024)# rgb_nd = rgb_nd.as_in_context(dealCtx[0])# class_IDs, scores, bounding_boxes = net(rgb_nd)# person = np.sum(class_IDs==0)# hat = np.sum(class_IDs==1)# scale = 1.0 * tempframe.shape[0] / scaled_frame.shape[0]# img, result = gcv.utils.viz.cv_plot_bbox(tempframe.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)# showImg(img)# del img,tempframe,frame# print(result)# read()# cap.release# =============================================================================class Live(object): def __init__(self): self.fps=25 self.frame_queueA = queue.Queue() self.frame_queueB = queue.Queue() self.frame_queueC = queue.Queue() self.frame_queueD = queue.Queue() # self.frame_queueE = queue.Queue() self.maxqueue = 1 self.infoUrl=r"D:\info.html" self.camera_path = r"rtmp://0.0.0.0:1936/live/3" self.count = np.zeros(7) self.height=720 self.width=1280 self.dealTimes = 0 self.lastShow = int(time.time()) # 摄像头rtmp def read_frame(self): cap = cv.VideoCapture(self.camera_path) while not cap.isOpened(): print("尝试重新连接") cap = cv.VideoCapture(self.camera_path) # Get video information self.fps = int(cap.get(cv.CAP_PROP_FPS)) while self.fps==0: print("尝试重新连接") cap = cv.VideoCapture(self.camera_path) self.fps = int(cap.get(cv.CAP_PROP_FPS)) # self.fps=dealFps # =================================================== self.width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) self.height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) print("开启接收",self.camera_path) print(self.width,self.height,self.fps) cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟 while cap.isOpened(): # startTime = time.time() ret, frame = cap.read() # ret, frame = cap.read() if ret==False: print("尝试重新连接") while ret==False: cap = cv.VideoCapture(self.camera_path) cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟 ret,frame = cap.read() print("重新连接成功") # put frame into queue # frame = cv.resize(frame, (self.w, self.h)) # print(gc.collect()) while self.frame_queueA.qsize()>=self.maxqueue: self.frame_queueA.get() self.frame_queueA.put(frame) # print('A',self.frame_queueA.qsize()) del frame # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动 # time.sleep(tt if tt>0 else 0) # startTime = time.time() ret, frame = cap.read() # ret, frame = cap.read() if ret==False: print("尝试重新连接") while ret==False: cap = cv.VideoCapture(self.camera_path) cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟 ret,frame = cap.read() print("重新连接成功") # put frame into queue # frame = cv.resize(frame, (self.w, self.h)) # print(gc.collect()) while self.frame_queueB.qsize()>=self.maxqueue: self.frame_queueB.get() self.frame_queueB.put(frame) # print('B',self.frame_queueB.qsize()) del frame # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动 # time.sleep(tt if tt>0 else 0) # startTime = time.time() # ret, frame = cap.read() ret, frame = cap.read() if ret==False: print("尝试重新连接") while ret==False: cap = cv.VideoCapture(self.camera_path) ret,frame = cap.read() print("重新连接成功") # put frame into queue # frame = cv.resize(frame, (self.w, self.h)) # print(gc.collect()) while self.frame_queueC.qsize()>=self.maxqueue: self.frame_queueC.get() self.frame_queueC.put(frame) # print('C',self.frame_queueC.qsize()) del frame # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动 # time.sleep(tt if tt>0 else 0) # startTime = time.time() # ret, frame = cap.read() ret, frame = cap.read() if ret==False: print("尝试重新连接") while ret==False: cap = cv.VideoCapture(self.camera_path) ret,frame = cap.read() print("重新连接成功") # put frame into queue # frame = cv.resize(frame, (self.w, self.h)) # print(gc.collect()) while self.frame_queueD.qsize()>=self.maxqueue: self.frame_queueD.get() self.frame_queueD.put(frame) # print('D',self.frame_queueD.qsize()) del frame # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动 # tt = (1/self.fps*4+startTime-time.time()) # time.sleep(tt if tt>0 else 0) # 这里要缓一下 服务器没这么好 暂时设置每四帧停一下 # =============================================================================# if int(time.time())-self.lastShow>=5:# print(datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S'))# ret, frame = cap.read()# showImg(frame)# self.lastShow = int(time.time())# =============================================================================# =============================================================================# # startTime = time.time()# # ret, frame = cap.read()# ret, frame = cap.read()# if ret==False:# print("尝试重新连接")# while ret==False:# cap = cv.VideoCapture(self.camera_path)# ret,frame = cap.read()# print("重新连接成功")# # put frame into queue# # frame = cv.resize(frame, (self.w, self.h))# # print(gc.collect())# while self.frame_queueE.qsize()>=self.maxqueue:# self.frame_queueE.get()# self.frame_queueE.put(frame)# # print('D',self.frame_queueD.qsize())# del frame# # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动# # time.sleep(tt if tt>0 else 0) # 这里要缓一下 服务器没这么好 暂时设置每四帧停一下# ============================================================================= def dealA(self): print("处理A线程开始") while True: if self.frame_queueA.empty() != True: t1=time.time() frame = self.frame_queueA.get()#取出队头 image = np.asarray(frame, dtype=np.uint8) del frame frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8') del image # 以上两句所用时间最多 rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024) rgb_nd = rgb_nd.as_in_context(dealCtx[0]) class_IDs, scores, bounding_boxes = net(rgb_nd) person = np.sum(class_IDs==0) hat = np.sum(class_IDs==1) scale = 1.0 * frame.shape[0] / scaled_frame.shape[0] img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh) del img self.dealTimes += 1 for x in result: if classes.count(x)>0: i = classes.index(x) self.count[i]+=1 # print('--A', result,time.time()-t1) def dealB(self): print("处理B线程开始") while True: if self.frame_queueB.empty() != True: t2=time.time() frame = self.frame_queueB.get() image = np.asarray(frame, dtype=np.uint8) frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8') # 以上两句所用时间最多 rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024) rgb_nd = rgb_nd.as_in_context(dealCtx[0]) class_IDs, scores, bounding_boxes = netB(rgb_nd) person = np.sum(class_IDs==0) hat = np.sum(class_IDs==1) scale = 1.0 * frame.shape[0] / scaled_frame.shape[0] img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh) for x in result: if classes.count(x)>0: i = classes.index(x) self.count[i]+=1 self.dealTimes += 1 # print('---B', result,time.time()-t2) def dealC(self): print("处理C线程开始") while True: if self.frame_queueC.empty() != True: t3=time.time() frame = self.frame_queueC.get()#取出队头 image = np.asarray(frame, dtype=np.uint8) frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8') # 以上两句所用时间最多 rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024) rgb_nd = rgb_nd.as_in_context(dealCtx[0]) class_IDs, scores, bounding_boxes = netC(rgb_nd) person = np.sum(class_IDs==0) hat = np.sum(class_IDs==1) scale = 1.0 * frame.shape[0] / scaled_frame.shape[0] img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh) for x in result: if classes.count(x)>0: i = classes.index(x) self.count[i]+=1 self.dealTimes += 1 # print('----C', result,time.time()-t3) def dealD(self): print("处理D线程开始") while True: if self.frame_queueD.empty() != True: t4=time.time() frame = self.frame_queueD.get()#取出队头 image = np.asarray(frame, dtype=np.uint8) frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8') # 以上两句所用时间最多 rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024) rgb_nd = rgb_nd.as_in_context(dealCtx[0]) class_IDs, scores, bounding_boxes = netD(rgb_nd) person = np.sum(class_IDs==0) hat = np.sum(class_IDs==1) scale = 1.0 * frame.shape[0] / scaled_frame.shape[0] img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh) for x in result: if classes.count(x)>0: i = classes.index(x) self.count[i]+=1 self.dealTimes += 1 # print('-----D', result,time.time()-t4) # =============================================================================# def dealE(self):# print("处理E线程开始")# while True:# if self.frame_queueE.empty() != True:# t4=time.time()# frame = self.frame_queueE.get()#取出队头# image = np.asarray(frame, dtype=np.uint8)# frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')# # 以上两句所用时间最多# rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)# rgb_nd = rgb_nd.as_in_context(dealCtx[0])# class_IDs, scores, bounding_boxes = netE(rgb_nd)# person = np.sum(class_IDs==0)# hat = np.sum(class_IDs==1)# scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]# img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)# for x in result:# if classes.count(x)>0:# i = classes.index(x)# self.count[i]+=1# self.dealTimes += 1# # print('-----D', result,time.time()-t4)# ============================================================================= def send(self): time.sleep(8) print("信息发送线程开始") while True: result=[classes[np.argmax(self.count)]] if not np.max(self.count)<=2 else [] # 如果三秒内某一标签最大出现次数大于两次,则计入出现次数最多的标签,作为结果输出,否则为空标签 print(self.count,result,self.dealTimes) with open(self.infoUrl,'w',encoding='utf8') as f: f.write(str(result)) self.count = np.zeros(7) self.dealTimes = 0 time.sleep(1/dealFps) def run(self): # 多线程处理 threads = [ threading.Thread(target=Live.read_frame, args=(self,)), threading.Thread(target=Live.dealA, args=(self,)), threading.Thread(target=Live.dealB, args=(self,)), threading.Thread(target=Live.dealC, args=(self,)), threading.Thread(target=Live.dealD, args=(self,)), # threading.Thread(target=Live.dealE, args=(self,)), threading.Thread(target=Live.send, args=(self,)), ] [thread.setDaemon(True) for thread in threads] [thread.start() for thread in threads] # os.chdir(r'C:\ffmpeg-4.3-win64-static\bin') # ffmpeg所在地址L = Live()L.run()def checkTime(): print(datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S')) showImg(cv.cvtColor(L.frame_queueA.get(), cv.COLOR_BGR2RGB))
更新后的代码
加入功能:多线程保存图片并定时上传百度网盘,添加历史记录日志。
# -*- coding: utf-8 -*-"""Created on Mon Jul 6 22:21:49 2020@author: Administrator"""# =============================================================================# """# 请开启TomCat# """# =============================================================================dealFps = 1/2# 模型窗口大小frameSize = (600,600)import cv2 as cvimport numpy as npimport os,time,queue,threading,datetime,socket,picklefrom queue import Queueimport multiprocessingfrom multiprocessing import Processfrom tensorflow.keras.preprocessing.image import array_to_imgdef showImg(frame): array_to_img(frame).show()import tensorflow as tftf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # 屏蔽WARNING★★import tensorflow.keras.backend as KK.clear_session()from tensorflow.keras.optimizers import Adamopt = Adam(lr=1e-4)def get_lr_metric(optimizer): # printing the value of the learning rate def lr(y_true, y_pred): return optimizer.lr return lrlr_metric = get_lr_metric(opt)from sendEmail import sendEmail# 加载有无模型names = ['无', '有']mPath = r'face_model_0116-193714.h5'model = tf.keras.models.load_model(mPath,{ 'lr': lr_metric}) ## 加载表情模型namesFacial = ['睡', '醒']mPathFacial = r'ce_model_-193714.h5'modelFacial = tf.keras.models.load_model(mPathFacial,{ 'lr': lr_metric}) ## 记录图和会话graph = tf.get_default_graph()sess = tf.keras.backend.get_session()'''★★注意!!!多线程会导致sess和graph不同 必须提前记录下★★'''# model.predict(np.zeros((1,frameSize[0],frameSize[1],3)), verbose=0, batch_size=1)# modelFacial.predict(np.zeros((1,frameSize[0],frameSize[1],3)), verbose=0, batch_size=1)print("初始化完毕")class Live(object): def __init__(self): self.fps=25 self.frame_queueA = Queue() # self.frame_queueA = multiprocessing.Manager().Queue() # self.maxqueue = 10 self.infoUrlPath=r"LABEL" # self.count = np.zeros(7) # self.countDict = {"ID":np.zeros(8)} self.ipDict = { "ID":"IP"} self.timeDict = { "ID":"TIME"}#摄像头上线时间 self.lastTimeDict = { "ID":"TIME"}#最后一次上线时间 self.dealTimes = { "ID":0} #两次发送更新标签间的模型处理次数 self.onLineID = ["ID",] self.lastShow = int(time.time()) self.threads = [] self.stop = True local_ip = socket.gethostname() local_port = 12345678901234567890 ip_port = (local_ip, local_port) self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sk.bind(ip_port) self.sk.listen(5500) print("accept now,wait for client", (local_ip, local_port)) self.recodes = [] self.sendResult = { } # 接受图片保存路径 self.picSaveFolder = r'C:n' # 是保存图片还是进行预测 self.saveOnly = False self.logDir = r'C:\' self.lastT = { "ID":"TIME"} def receiveThread(self,conn,ip): print(datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S'),"连接到",ip) conn.settimeout(10) conn_end = False pack_size = 1024*5 ID = "" while True: if conn_end: break img = b"" tmp = b"" try: tmp = conn.recv(18) #开始接收 except socket.timeout: conn_end = True while True: # print(tmp) if tmp[-2:] == b'\xFF\xD8': # 如果读到图片头 ID = tmp[:16].decode() #str 解析出摄像头ID img = b'\xFF\xD8' break # 图片头接收结束 try: client_data = conn.recv(1) except socket.timeout: conn_end = True break tmp = tmp[-17:] + client_data while True: try: client_data = conn.recv(4096) except socket.timeout: client_data = None conn_end = True if not client_data: break # print("received data,len:",len(client_data) ) img += client_data if img[-2:] == b'\xFF\xD9': #检查图片尾 break if len(client_data) > pack_size: break # print("recive end, pic len:", len(img)) # 检验图片 if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'): print("image error") continue conn.send(b'ok') if self.stop: conn.send(b'st') time.sleep(1) try: self.onLineID.remove(ID) except: pass conn.close() print("接收结束============") break if self.saveOnly : ttta=time.time() picSavePath = os.path.join(self.picSaveFolder,ID+'-'+time.strftime('%m%d-%H%M%S')+'.jpg') try: with open(picSavePath, "wb") as f: f.write(img) except Exception as e: print(e) print('成功保存图片',picSavePath,'用时:',time.time()-ttta) else: image = np.asarray(bytearray(img), dtype="uint8") image = cv.imdecode(image, cv.IMREAD_COLOR)[:,280:1000,:]#切分出中心画面 #加入队列 # image = cv.cvtColor(image, cv.COLOR_BGR2RGB) self.frame_queueA.put((ID,image)) self.lastTimeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S')#更新摄像头在线 # print(image.shape) # 检查ID ex = False for exID in list(self.ipDict): if ID == exID: ex = True if not ex: print("注册新摄像头",ID,ip) # self.countDict[ID] = np.zeros(8) self.ipDict[ID] = str(ip) self.dealTimes[ID] = 0 self.lastT[ID] = 0.0 # 标记时间为未上线 self.timeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S') if ID not in self.onLineID: self.onLineID.append(ID) #如果距离上次收到图片十分钟,判定为摄像头重新上线而非网络不稳定 if (time.time()-float(self.lastT[ID])>60*30): if(ID.count('HH10007GAKEOEEOJ')==0): sendEmail('摄像头'+ID+'上线','摄像头'+ID+'上线') # 更新时间 self.lastT[ID] = time.time() if self.stop: conn.send(b'st') time.sleep(1) try: self.onLineID.remove(ID) except: pass conn.close() print("接收结束============") break conn.close() while self.onLineID.count(ID)>=1: self.onLineID.remove(ID) print("receive thread end",ID,"下线") # print(datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S'),"连接到",ip) # conn.settimeout(5) # conn_end = False # pack_size = 1024*5 # myID = "" # while True: # if conn_end: # break # img = b"" # tmp = b"" # ID = "" # try: # tmp = conn.recv(18) # print(tmp) # except socket.timeout: # conn_end = True # break # except Exception as e: # print("Recv ERR"+e) # conn_end = True # break # if conn_end: # break # while True: # # print(tmp) # if tmp[-2:] == b'\xFF\xD8': # ID = tmp[:16].decode() #str # img = b'\xFF\xD8' # break # try: # client_data = conn.recv(1) # except socket.timeout: # conn_end = True # break # except Exception as e: # print("ERROR lost conn",datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S'),e,ID,ip) # if self.onLineID.count(ID)==1: # self.onLineID.remove(ID) # conn.close() # ID = "" # break # tmp = tmp[-17:] + client_data # if conn_end: # break # if ID == "": # break # while len(ID)==16: # try: # client_data = conn.recv(4096) # except socket.timeout: # client_data = None # conn_end = True # if not client_data: # break # # print("received data,len:",len(client_data) ) # img += client_data # if img[-2:] == b'\xFF\xD9': # conn.send(b'ok') # break # # if len(client_data) > pack_size: # # break # # print("recive end, pic len:", len(img)) # # 检验图片 # if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'): # print("image error") # continue # if self.stop: # conn.send(b'st') # time.sleep(1) # conn.close() # print("接收结束============") # break # image = numpy.asarray(bytearray(img), dtype="uint8") # image = cv.imdecode(image, cv.IMREAD_COLOR) # self.lastTimeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S') # if self.onLineID.count(ID)==0: # self.onLineID.append(ID) # # print(image.shape) # # 检查ID # ex = False # for exID in list(self.countDict): # if ID == exID: # ex = True # if not ex: # print("注册新摄像头",ID,image.shape,ip) # self.countDict[ID] = np.zeros(7) # self.ipDict[ID] = str(ip) # self.dealTimes[ID] = 0 # self.timeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S') # myID = ID # # self.onLineID.append(ID) # self.frame_queueA.put((ID,image)) # conn.close() # while self.onLineID.count(ID)>=1: # self.onLineID.remove(ID) # while self.onLineID.count(myID)>=1: # self.onLineID.remove(myID) # print("receive thread end",myID,"下线") def server(self): print("接收服务上线") while True: conn, addr = self.sk.accept() t = threading.Thread(target=self.receiveThread, args=(conn,addr,)) t.setDaemon(True) t.start() if self.stop: print("服务结束") break def deal(self,num): print("处理",num,"线程开始") while True: if self.stop: print("处理",num,"线程结束") break if self.frame_queueA.empty() != True: ID,frame = self.frame_queueA.get()#取出队头 result = '' with sess.as_default(): with graph.as_default(): # 注意!!!多线程会导致sess和graph不同 必须在每次预测前手动更改 img = cv.cvtColor(np.asarray(frame), cv.COLOR_BGR2RGB) # 这里切出中间的部分,放进模型预测,但是app输出还是720*1280 # img = img[int(720/4):int(3*720/4),int(1280/4+100):int(3*1280/4-100),:] print(img.shape) img = cv.resize(img, (frameSize[0],frameSize[1])) img = np.asarray(img,dtype=float).reshape((1,frameSize[0],frameSize[1],3))/255.0 # 检测有无 # pred = model.predict(img, verbose=0, batch_size=1) #这里输出二维 # self.recodes.append([ID,time.time(),'modelYesOrNo',pred]) # result = names[int(np.argmax(pred,axis=1))] # if result==names[1]: if True: # 检测表情 predFacial = modelFacial.predict(img, verbose=0, batch_size=1) #这里输出一维 self.recodes.append([ID,time.time(),'modelFacial',predFacial]) resultFacial = namesFacial[1 if float(predFacial)>0.5 else 0] print(ID, result, resultFacial, float(predFacial)) self.sendResult[ID]=resultFacial else: print(ID, result, pred) self.sendResult[ID]=result del frame self.dealTimes[ID] += 1 if self.stop: print("处理",num,"线程结束") break else: # 队列空 休息会 time.sleep(0.97) # with sess.as_default(): # with graph.as_default(): # tt = time.time() # bs = 48 # 32对应0.64s 48对应0.95s 24对应0.48s # model.predict(np.zeros((bs,frameSize[0],frameSize[1],3)), verbose=0, batch_size=bs) # modelFacial.predict(np.zeros((bs,frameSize[0],frameSize[1],3)), verbose=0, batch_size=bs) # print('处理A进程堕态',time.time()-tt) def uploadData(self): print("数据上传线程开始") import shutil,os,time from bypy import ByPy while True: if self.stop: break fileList = os.listdir(self.picSaveFolder) if len(fileList)>200: nowTime = time.strftime('%m%d %H%M') newPath = r'C:\Users\Administrator\Desktop\tmp-'+nowTime # 新建临时文件夹 if not os.path.exists(newPath): os.mkdir(newPath) # 清空临时文件夹已有文件 for file in os.listdir(newPath): os.remove(os.path.join(newPath,file)) # 复制数据 for file in fileList: shutil.copy(os.path.join(self.picSaveFolder,file), os.path.join(newPath,file)) # 上传 bp=ByPy() if(bp.upload(newPath,r'/cloud/摄像头数据/'+nowTime+r'/')==60): # 上传文件夹 print('文件上传成功') # 删除已上传文件 for file in fileList: os.remove(os.path.join(newPath,file)) for file in fileList: os.remove(os.path.join(self.picSaveFolder,file)) # 删除临时文件夹 os.removedirs(newPath) else: print('备份上传失败,临时文件夹请见',newPath) else: if self.stop: break time.sleep(60*30) if self.stop: break def send(self): print("信息发送线程开始") while True: for ID in self.onLineID: if self.stop: break if ID=='ID': continue # result=[classes[np.argmax(self.countDict[ID])]] if not np.max(self.countDict[ID])<=0 else [] # 如果三秒内某一标签最大出现次数大于两次,则计入出现次数最多的标签,作为结果输出,否则为空标签 if self.dealTimes[ID]>0: # print(ID,self.countDict[ID],result,self.dealTimes[ID]) try: with open(os.path.join(self.infoUrlPath,ID+".html"),'w',encoding='utf8') as f: # f.write(str(result)) f.write('['+self.sendResult[ID]+']') # print("update label html ok") except PermissionError as ee: print(ee) except Exception as e: print("ERROR while saving html",e) # self.countDict[ID] = np.zeros(8) self.dealTimes[ID] = 0 if self.stop: print("信息发送线程结束") break time.sleep(1/dealFps) def Run(self): # 多线程处理 self.stop = False self.threads.append(threading.Thread(target=Live.server, args=(self,))) #接收线程 if self.saveOnly: self.threads.append(threading.Thread(target=Live.uploadData, args=(self,))) #上传线程 else: self.threads.append(threading.Thread(target=Live.send, args=(self,))) #发送标签 for i in range(5): self.threads.append(threading.Thread(target=Live.deal, args=(self,str(i)))) #模型处理 # 打开线程 [thread.setDaemon(True) for thread in self.threads] [thread.start() for thread in self.threads] #多线程# =============================================================================# self.threads = [# Process(target=Live.deal, args=(self,'0')),# Process(target=Live.deal, args=(self,'1')),# Process(target=Live.deal, args=(self,'2')),# Process(target=Live.deal, args=(self,'3')),# Process(target=Live.send, args=(self,)),# Process(target=Live.server, args=(self,)),# ]# [thread.start() for thread in self.threads] #queue.Queue不能用于多进程# ============================================================================= def Stop(self): self.stop = True time.sleep(11) while not self.frame_queueA.empty(): self.frame_queueA.get() self.count = np.zeros(8) with open(os.path.join(self.logDir,"runningInf.log"),'a+',encoding='utf8') as f: f.write(str(datetime.datetime.strftime(datetime.datetime.now(),'%m %d-%H:%M:%S'))+' 服务停止\n') f.write("ID&IP"+str(self.ipDict)+'\n') f.write("注册时间"+str(self.timeDict)+'\n') f.write("关闭时间"+str(self.lastTimeDict)+'\n') f.write("log end \n") print('日志写入完成') if not self.saveOnly: logSavePath = os.path.join(self.logDir,'recodes-'+time.strftime('%m%d-%H%M%S')+'.pickle') with open(logSavePath, "wb") as f: f.write(pickle.dumps(self.recodes)) print('详细日志写入完成') print("停止成功") L = Live()def checkTime(): if(not L.frame_queueA.empty()): tttt = datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S') ID,frame = L.frame_queueA.get() print(tttt,ID) frame = cv.resize(cv.cvtColor(frame, cv.COLOR_BGR2RGB), (frameSize[0],frameSize[1])) showImg(frame) else: print("队列空") time.sleep(0.4) runCheck() def runCheck(): t = threading.Thread(target=checkTime, args=()) t.setDaemon(True) t.start() # L.Stop()# 在CMD中运行def pretect(): try: while True: pass except Exception as e: print(e) finally: L.Stop()L.Run()# pretect()
转载地址:http://yrzti.baihongyu.com/