博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python搭建RTMP流媒体服务器,实现网络摄像头的推流,并使用机器视觉模型Yolo-v3实时处理视频、输出分类标签。
阅读量:4142 次
发布时间:2019-05-25

本文共 37792 字,大约阅读时间需要 125 分钟。

Python搭建RTMP流媒体服务器,实现网络摄像头的推流,并使用机器视觉模型Yolo-v3实时处理视频、输出分类标签。

对于推流与处理,用到了多线程防止卡死。

# -*- coding: utf-8 -*-"""Created on Mon Jul  6 22:21:49 2020@author: Straka"""# =============================================================================# """# 1 开启TomCat# 2 开启NGINX(cmd)#         C:#         cd C:\Users\Administrator\Desktop\nginx#         start nginx#         nginx -s reload# 3 开启 C:\ffmpeg-4.3-win64-static\bin\MulHYZY_dealOnly-2.py# """# =============================================================================dealFps = 1/2# import gcimport time,datetimeimport queueimport threading# import cv2 as cvimport subprocess as spimport numpy as np# import sys# sys.setrecursionlimit(2**31-1) # 设置栈的大小# sys.path.append("./")import mxnet as mximport osos.environ["MXNET_CUDNN_AUTOTUNE_DEFAULT"] = "0"os.chdir(r'C:/BabyCam')import gluoncv as gcvgcv.utils.check_version('0.6.0')from gluoncv.model_zoo import get_modelprint("包加载完毕")from gluoncv.utils import try_import_cv2cv = try_import_cv2()nms_thresh = 0.45classes = ['asleep','awake','crying','quilt kicked','side asleep','on stomach', 'face covered',]classes = ['熟睡','清醒','哭闹','踢被','侧睡','趴睡', '捂被',]dealCtx = [mx.gpu(0)] # dealCtx = [mx.cpu(0)]# 模型加载model_dir = r'C:\BabyCam\model/yolov3/yolo3_darknet53_custom_best.params'net_name = '_'.join(('yolo3', 'darknet53', 'custom'))net = get_model(net_name, classes=classes, ctx=dealCtx)net.load_parameters(model_dir, ctx=dealCtx)net.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()net.hybridize()print("模型A加载完毕")netB = get_model(net_name, classes=classes, ctx=dealCtx)netB.load_parameters(model_dir, ctx=dealCtx)netB.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()netB.hybridize()print("模型B加载完毕")netC = get_model(net_name, classes=classes, ctx=dealCtx)netC.load_parameters(model_dir, ctx=dealCtx)netC.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()netC.hybridize()print("模型C加载完毕")netD = get_model(net_name, classes=classes, ctx=dealCtx)netD.load_parameters(model_dir, ctx=dealCtx)netD.set_nms(nms_thresh=0.45, nms_topk=400)mx.nd.waitall()netC.hybridize()print("模型D加载完毕")# =============================================================================# netE = get_model(net_name, classes=classes, ctx=dealCtx)# netE.load_parameters(model_dir, ctx=dealCtx)# netE.set_nms(nms_thresh=0.45, nms_topk=400)# mx.nd.waitall()# netE.hybridize()# print("模型E加载完毕")# =============================================================================tempframe = mx.nd.array(cv.cvtColor(np.ones((720, 1080, 3),np.uint8)*128, cv.COLOR_BGR2RGB)).astype('uint8')rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(tempframe, short=416, max_size=1024)rgb_nd = rgb_nd.as_in_context(dealCtx[0])class_IDs, scores, bounding_boxes = net(rgb_nd)class_IDs, scores, bounding_boxes = netB(rgb_nd)class_IDs, scores, bounding_boxes = netC(rgb_nd)class_IDs, scores, bounding_boxes = netD(rgb_nd)# class_IDs, scores, bounding_boxes = netE(rgb_nd)person = np.sum(class_IDs==0)hat = np.sum(class_IDs==1)scale = 1.0 * tempframe.shape[0] / scaled_frame.shape[0]img, result = gcv.utils.viz.cv_plot_bbox(tempframe.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)del img,tempframe,rgb_nd,class_IDs,scores,bounding_boxes,person,hat,scale,resultprint("初始化完毕")from tensorflow.keras.preprocessing.image import array_to_imgdef showImg(frame):    array_to_img(frame).show()# =============================================================================# import datetime# cap = cv.VideoCapture(r"rtmp://0.0.0.0:1936/live/3")# def read():#     now_time = datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S')#     ret, frame = cap.read()#     # showImg(frame)#     print(now_time)#     tempframe = mx.nd.array(cv.cvtColor(frame, cv.COLOR_BGR2RGB)).astype('uint8')#     rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(tempframe, short=416, max_size=1024)#     rgb_nd = rgb_nd.as_in_context(dealCtx[0])#     class_IDs, scores, bounding_boxes = net(rgb_nd)#     person = np.sum(class_IDs==0)#     hat = np.sum(class_IDs==1)#     scale = 1.0 * tempframe.shape[0] / scaled_frame.shape[0]#     img, result = gcv.utils.viz.cv_plot_bbox(tempframe.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)#     showImg(img)#     del img,tempframe,frame#     print(result)# read()# cap.release# =============================================================================class Live(object):    def __init__(self):        self.fps=25        self.frame_queueA = queue.Queue()        self.frame_queueB = queue.Queue()        self.frame_queueC = queue.Queue()        self.frame_queueD = queue.Queue()        # self.frame_queueE = queue.Queue()        self.maxqueue = 1        self.infoUrl=r"D:\info.html"        self.camera_path = r"rtmp://0.0.0.0:1936/live/3"        self.count = np.zeros(7)        self.height=720        self.width=1280        self.dealTimes = 0        self.lastShow = int(time.time())              # 摄像头rtmp    def read_frame(self):        cap = cv.VideoCapture(self.camera_path)        while not cap.isOpened():            print("尝试重新连接")            cap = cv.VideoCapture(self.camera_path)        # Get video information        self.fps = int(cap.get(cv.CAP_PROP_FPS))        while self.fps==0:            print("尝试重新连接")            cap = cv.VideoCapture(self.camera_path)            self.fps = int(cap.get(cv.CAP_PROP_FPS))        # self.fps=dealFps # ===================================================        self.width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))        self.height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))        print("开启接收",self.camera_path)        print(self.width,self.height,self.fps)        cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames        cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟        while cap.isOpened():            # startTime = time.time()            ret, frame = cap.read()            # ret, frame = cap.read()            if ret==False:                print("尝试重新连接")                while ret==False:                    cap = cv.VideoCapture(self.camera_path)                    cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames                    cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟                    ret,frame = cap.read()                print("重新连接成功")            # put frame into queue            # frame = cv.resize(frame, (self.w, self.h))            # print(gc.collect())            while self.frame_queueA.qsize()>=self.maxqueue:                self.frame_queueA.get()            self.frame_queueA.put(frame)            # print('A',self.frame_queueA.qsize())            del frame            # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动            # time.sleep(tt if tt>0 else 0)                        # startTime = time.time()            ret, frame = cap.read()            # ret, frame = cap.read()            if ret==False:                print("尝试重新连接")                while ret==False:                    cap = cv.VideoCapture(self.camera_path)                    cap.set(cv.CAP_PROP_BUFFERSIZE, 3); # internal buffer will now store only 3 frames                    cap.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc('M', 'J', 'P', 'G')) # 降低延迟                    ret,frame = cap.read()                print("重新连接成功")            # put frame into queue            # frame = cv.resize(frame, (self.w, self.h))            # print(gc.collect())            while self.frame_queueB.qsize()>=self.maxqueue:                self.frame_queueB.get()            self.frame_queueB.put(frame)            # print('B',self.frame_queueB.qsize())            del frame            # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动            # time.sleep(tt if tt>0 else 0)                        # startTime = time.time()            # ret, frame = cap.read()            ret, frame = cap.read()            if ret==False:                print("尝试重新连接")                while ret==False:                    cap = cv.VideoCapture(self.camera_path)                    ret,frame = cap.read()                print("重新连接成功")            # put frame into queue            # frame = cv.resize(frame, (self.w, self.h))            # print(gc.collect())            while self.frame_queueC.qsize()>=self.maxqueue:                self.frame_queueC.get()            self.frame_queueC.put(frame)            # print('C',self.frame_queueC.qsize())            del frame            # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动            # time.sleep(tt if tt>0 else 0)                        # startTime = time.time()            # ret, frame = cap.read()            ret, frame = cap.read()            if ret==False:                print("尝试重新连接")                while ret==False:                    cap = cv.VideoCapture(self.camera_path)                    ret,frame = cap.read()                print("重新连接成功")            # put frame into queue            # frame = cv.resize(frame, (self.w, self.h))            # print(gc.collect())            while self.frame_queueD.qsize()>=self.maxqueue:                self.frame_queueD.get()            self.frame_queueD.put(frame)            # print('D',self.frame_queueD.qsize())            del frame            # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动            # tt = (1/self.fps*4+startTime-time.time())             # time.sleep(tt if tt>0 else 0) # 这里要缓一下 服务器没这么好 暂时设置每四帧停一下            # =============================================================================#             if int(time.time())-self.lastShow>=5:#                 print(datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S'))#                 ret, frame = cap.read()#                 showImg(frame)#                 self.lastShow = int(time.time())# =============================================================================# =============================================================================#             # startTime = time.time()#             # ret, frame = cap.read()#             ret, frame = cap.read()#             if ret==False:#                 print("尝试重新连接")#                 while ret==False:#                     cap = cv.VideoCapture(self.camera_path)#                     ret,frame = cap.read()#                 print("重新连接成功")#             # put frame into queue#             # frame = cv.resize(frame, (self.w, self.h))#             # print(gc.collect())#             while self.frame_queueE.qsize()>=self.maxqueue:#                 self.frame_queueE.get()#             self.frame_queueE.put(frame)#             # print('D',self.frame_queueD.qsize())#             del frame#             # tt = (1/self.fps+startTime-time.time()) # 开始时间+每帧时间-当前时间-波动#             # time.sleep(tt if tt>0 else 0) # 这里要缓一下 服务器没这么好 暂时设置每四帧停一下# =============================================================================                            def dealA(self):        print("处理A线程开始")        while True:            if self.frame_queueA.empty() != True:                t1=time.time()                frame = self.frame_queueA.get()#取出队头                image = np.asarray(frame, dtype=np.uint8)                del frame                frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')                del image                # 以上两句所用时间最多                rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)                rgb_nd = rgb_nd.as_in_context(dealCtx[0])                class_IDs, scores, bounding_boxes = net(rgb_nd)                person = np.sum(class_IDs==0)                hat = np.sum(class_IDs==1)                scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]                img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)                del img                self.dealTimes += 1                for x in result:                    if classes.count(x)>0:                        i = classes.index(x)                        self.count[i]+=1                # print('--A', result,time.time()-t1)                    def dealB(self):        print("处理B线程开始")        while True:            if self.frame_queueB.empty() != True:                t2=time.time()                frame = self.frame_queueB.get()                image = np.asarray(frame, dtype=np.uint8)                frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')                # 以上两句所用时间最多                rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)                rgb_nd = rgb_nd.as_in_context(dealCtx[0])                class_IDs, scores, bounding_boxes = netB(rgb_nd)                person = np.sum(class_IDs==0)                hat = np.sum(class_IDs==1)                scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]                img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)                for x in result:                    if classes.count(x)>0:                        i = classes.index(x)                        self.count[i]+=1                self.dealTimes += 1                # print('---B', result,time.time()-t2)                    def dealC(self):        print("处理C线程开始")        while True:            if self.frame_queueC.empty() != True:                t3=time.time()                frame = self.frame_queueC.get()#取出队头                image = np.asarray(frame, dtype=np.uint8)                frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')                # 以上两句所用时间最多                rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)                rgb_nd = rgb_nd.as_in_context(dealCtx[0])                class_IDs, scores, bounding_boxes = netC(rgb_nd)                person = np.sum(class_IDs==0)                hat = np.sum(class_IDs==1)                scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]                img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)                for x in result:                    if classes.count(x)>0:                        i = classes.index(x)                        self.count[i]+=1                self.dealTimes += 1                # print('----C', result,time.time()-t3)                                    def dealD(self):        print("处理D线程开始")        while True:            if self.frame_queueD.empty() != True:                t4=time.time()                frame = self.frame_queueD.get()#取出队头                image = np.asarray(frame, dtype=np.uint8)                frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')                # 以上两句所用时间最多                rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)                rgb_nd = rgb_nd.as_in_context(dealCtx[0])                class_IDs, scores, bounding_boxes = netD(rgb_nd)                person = np.sum(class_IDs==0)                hat = np.sum(class_IDs==1)                scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]                img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)                for x in result:                    if classes.count(x)>0:                        i = classes.index(x)                        self.count[i]+=1                self.dealTimes += 1                # print('-----D', result,time.time()-t4)               # =============================================================================#     def dealE(self):#         print("处理E线程开始")#         while True:#             if self.frame_queueE.empty() != True:#                 t4=time.time()#                 frame = self.frame_queueE.get()#取出队头#                 image = np.asarray(frame, dtype=np.uint8)#                 frame = mx.nd.array(cv.cvtColor(image, cv.COLOR_BGR2RGB)).astype('uint8')#                 # 以上两句所用时间最多#                 rgb_nd, scaled_frame = gcv.data.transforms.presets.yolo.transform_test(frame, short=416, max_size=1024)#                 rgb_nd = rgb_nd.as_in_context(dealCtx[0])#                 class_IDs, scores, bounding_boxes = netE(rgb_nd)#                 person = np.sum(class_IDs==0)#                 hat = np.sum(class_IDs==1)#                 scale = 1.0 * frame.shape[0] / scaled_frame.shape[0]#                 img, result = gcv.utils.viz.cv_plot_bbox(frame.asnumpy(), bounding_boxes[0], scores[0], class_IDs[0], class_names=net.classes, scale=scale, thresh=nms_thresh)#                 for x in result:#                     if classes.count(x)>0:#                         i = classes.index(x)#                         self.count[i]+=1#                 self.dealTimes += 1#                 # print('-----D', result,time.time()-t4)# =============================================================================                                    def send(self):        time.sleep(8)        print("信息发送线程开始")        while True:            result=[classes[np.argmax(self.count)]] if not np.max(self.count)<=2 else []            # 如果三秒内某一标签最大出现次数大于两次,则计入出现次数最多的标签,作为结果输出,否则为空标签            print(self.count,result,self.dealTimes)            with open(self.infoUrl,'w',encoding='utf8') as f:                f.write(str(result))            self.count = np.zeros(7)            self.dealTimes = 0            time.sleep(1/dealFps)                    def run(self):        # 多线程处理        threads = [            threading.Thread(target=Live.read_frame, args=(self,)),            threading.Thread(target=Live.dealA, args=(self,)),            threading.Thread(target=Live.dealB, args=(self,)),            threading.Thread(target=Live.dealC, args=(self,)),            threading.Thread(target=Live.dealD, args=(self,)),            # threading.Thread(target=Live.dealE, args=(self,)),            threading.Thread(target=Live.send, args=(self,)),        ]        [thread.setDaemon(True) for thread in threads]        [thread.start() for thread in threads]        # os.chdir(r'C:\ffmpeg-4.3-win64-static\bin') # ffmpeg所在地址L = Live()L.run()def checkTime():    print(datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S'))    showImg(cv.cvtColor(L.frame_queueA.get(), cv.COLOR_BGR2RGB))

更新后的代码

加入功能:多线程保存图片并定时上传百度网盘,添加历史记录日志。

# -*- coding: utf-8 -*-"""Created on Mon Jul  6 22:21:49 2020@author: Administrator"""# =============================================================================# """# 请开启TomCat# """# =============================================================================dealFps = 1/2# 模型窗口大小frameSize = (600,600)import cv2 as cvimport numpy as npimport os,time,queue,threading,datetime,socket,picklefrom queue import Queueimport multiprocessingfrom multiprocessing import Processfrom tensorflow.keras.preprocessing.image import array_to_imgdef showImg(frame):    array_to_img(frame).show()import tensorflow as tftf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) # 屏蔽WARNING★★import tensorflow.keras.backend as KK.clear_session()from tensorflow.keras.optimizers import Adamopt = Adam(lr=1e-4)def get_lr_metric(optimizer):  # printing the value of the learning rate        def lr(y_true, y_pred):            return optimizer.lr        return lrlr_metric = get_lr_metric(opt)from sendEmail import sendEmail# 加载有无模型names = ['无', '有']mPath = r'face_model_0116-193714.h5'model = tf.keras.models.load_model(mPath,{
'lr': lr_metric}) ## 加载表情模型namesFacial = ['睡', '醒']mPathFacial = r'ce_model_-193714.h5'modelFacial = tf.keras.models.load_model(mPathFacial,{
'lr': lr_metric}) ## 记录图和会话graph = tf.get_default_graph()sess = tf.keras.backend.get_session()'''★★注意!!!多线程会导致sess和graph不同 必须提前记录下★★'''# model.predict(np.zeros((1,frameSize[0],frameSize[1],3)), verbose=0, batch_size=1)# modelFacial.predict(np.zeros((1,frameSize[0],frameSize[1],3)), verbose=0, batch_size=1)print("初始化完毕")class Live(object): def __init__(self): self.fps=25 self.frame_queueA = Queue() # self.frame_queueA = multiprocessing.Manager().Queue() # self.maxqueue = 10 self.infoUrlPath=r"LABEL" # self.count = np.zeros(7) # self.countDict = {"ID":np.zeros(8)} self.ipDict = {
"ID":"IP"} self.timeDict = {
"ID":"TIME"}#摄像头上线时间 self.lastTimeDict = {
"ID":"TIME"}#最后一次上线时间 self.dealTimes = {
"ID":0} #两次发送更新标签间的模型处理次数 self.onLineID = ["ID",] self.lastShow = int(time.time()) self.threads = [] self.stop = True local_ip = socket.gethostname() local_port = 12345678901234567890 ip_port = (local_ip, local_port) self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sk.bind(ip_port) self.sk.listen(5500) print("accept now,wait for client", (local_ip, local_port)) self.recodes = [] self.sendResult = {
} # 接受图片保存路径 self.picSaveFolder = r'C:n' # 是保存图片还是进行预测 self.saveOnly = False self.logDir = r'C:\' self.lastT = {
"ID":"TIME"} def receiveThread(self,conn,ip): print(datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S'),"连接到",ip) conn.settimeout(10) conn_end = False pack_size = 1024*5 ID = "" while True: if conn_end: break img = b"" tmp = b"" try: tmp = conn.recv(18) #开始接收 except socket.timeout: conn_end = True while True: # print(tmp) if tmp[-2:] == b'\xFF\xD8': # 如果读到图片头 ID = tmp[:16].decode() #str 解析出摄像头ID img = b'\xFF\xD8' break # 图片头接收结束 try: client_data = conn.recv(1) except socket.timeout: conn_end = True break tmp = tmp[-17:] + client_data while True: try: client_data = conn.recv(4096) except socket.timeout: client_data = None conn_end = True if not client_data: break # print("received data,len:",len(client_data) ) img += client_data if img[-2:] == b'\xFF\xD9': #检查图片尾 break if len(client_data) > pack_size: break # print("recive end, pic len:", len(img)) # 检验图片 if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'): print("image error") continue conn.send(b'ok') if self.stop: conn.send(b'st') time.sleep(1) try: self.onLineID.remove(ID) except: pass conn.close() print("接收结束============") break if self.saveOnly : ttta=time.time() picSavePath = os.path.join(self.picSaveFolder,ID+'-'+time.strftime('%m%d-%H%M%S')+'.jpg') try: with open(picSavePath, "wb") as f: f.write(img) except Exception as e: print(e) print('成功保存图片',picSavePath,'用时:',time.time()-ttta) else: image = np.asarray(bytearray(img), dtype="uint8") image = cv.imdecode(image, cv.IMREAD_COLOR)[:,280:1000,:]#切分出中心画面 #加入队列 # image = cv.cvtColor(image, cv.COLOR_BGR2RGB) self.frame_queueA.put((ID,image)) self.lastTimeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S')#更新摄像头在线 # print(image.shape) # 检查ID ex = False for exID in list(self.ipDict): if ID == exID: ex = True if not ex: print("注册新摄像头",ID,ip) # self.countDict[ID] = np.zeros(8) self.ipDict[ID] = str(ip) self.dealTimes[ID] = 0 self.lastT[ID] = 0.0 # 标记时间为未上线 self.timeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S') if ID not in self.onLineID: self.onLineID.append(ID) #如果距离上次收到图片十分钟,判定为摄像头重新上线而非网络不稳定 if (time.time()-float(self.lastT[ID])>60*30): if(ID.count('HH10007GAKEOEEOJ')==0): sendEmail('摄像头'+ID+'上线','摄像头'+ID+'上线') # 更新时间 self.lastT[ID] = time.time() if self.stop: conn.send(b'st') time.sleep(1) try: self.onLineID.remove(ID) except: pass conn.close() print("接收结束============") break conn.close() while self.onLineID.count(ID)>=1: self.onLineID.remove(ID) print("receive thread end",ID,"下线") # print(datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S'),"连接到",ip) # conn.settimeout(5) # conn_end = False # pack_size = 1024*5 # myID = "" # while True: # if conn_end: # break # img = b"" # tmp = b"" # ID = "" # try: # tmp = conn.recv(18) # print(tmp) # except socket.timeout: # conn_end = True # break # except Exception as e: # print("Recv ERR"+e) # conn_end = True # break # if conn_end: # break # while True: # # print(tmp) # if tmp[-2:] == b'\xFF\xD8': # ID = tmp[:16].decode() #str # img = b'\xFF\xD8' # break # try: # client_data = conn.recv(1) # except socket.timeout: # conn_end = True # break # except Exception as e: # print("ERROR lost conn",datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S'),e,ID,ip) # if self.onLineID.count(ID)==1: # self.onLineID.remove(ID) # conn.close() # ID = "" # break # tmp = tmp[-17:] + client_data # if conn_end: # break # if ID == "": # break # while len(ID)==16: # try: # client_data = conn.recv(4096) # except socket.timeout: # client_data = None # conn_end = True # if not client_data: # break # # print("received data,len:",len(client_data) ) # img += client_data # if img[-2:] == b'\xFF\xD9': # conn.send(b'ok') # break # # if len(client_data) > pack_size: # # break # # print("recive end, pic len:", len(img)) # # 检验图片 # if not img.startswith(b'\xFF\xD8') or not img.endswith(b'\xFF\xD9'): # print("image error") # continue # if self.stop: # conn.send(b'st') # time.sleep(1) # conn.close() # print("接收结束============") # break # image = numpy.asarray(bytearray(img), dtype="uint8") # image = cv.imdecode(image, cv.IMREAD_COLOR) # self.lastTimeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S') # if self.onLineID.count(ID)==0: # self.onLineID.append(ID) # # print(image.shape) # # 检查ID # ex = False # for exID in list(self.countDict): # if ID == exID: # ex = True # if not ex: # print("注册新摄像头",ID,image.shape,ip) # self.countDict[ID] = np.zeros(7) # self.ipDict[ID] = str(ip) # self.dealTimes[ID] = 0 # self.timeDict[ID] = datetime.datetime.strftime(datetime.datetime.now(),'%m%d-%H:%M:%S') # myID = ID # # self.onLineID.append(ID) # self.frame_queueA.put((ID,image)) # conn.close() # while self.onLineID.count(ID)>=1: # self.onLineID.remove(ID) # while self.onLineID.count(myID)>=1: # self.onLineID.remove(myID) # print("receive thread end",myID,"下线") def server(self): print("接收服务上线") while True: conn, addr = self.sk.accept() t = threading.Thread(target=self.receiveThread, args=(conn,addr,)) t.setDaemon(True) t.start() if self.stop: print("服务结束") break def deal(self,num): print("处理",num,"线程开始") while True: if self.stop: print("处理",num,"线程结束") break if self.frame_queueA.empty() != True: ID,frame = self.frame_queueA.get()#取出队头 result = '' with sess.as_default(): with graph.as_default(): # 注意!!!多线程会导致sess和graph不同 必须在每次预测前手动更改 img = cv.cvtColor(np.asarray(frame), cv.COLOR_BGR2RGB) # 这里切出中间的部分,放进模型预测,但是app输出还是720*1280 # img = img[int(720/4):int(3*720/4),int(1280/4+100):int(3*1280/4-100),:] print(img.shape) img = cv.resize(img, (frameSize[0],frameSize[1])) img = np.asarray(img,dtype=float).reshape((1,frameSize[0],frameSize[1],3))/255.0 # 检测有无 # pred = model.predict(img, verbose=0, batch_size=1) #这里输出二维 # self.recodes.append([ID,time.time(),'modelYesOrNo',pred]) # result = names[int(np.argmax(pred,axis=1))] # if result==names[1]: if True: # 检测表情 predFacial = modelFacial.predict(img, verbose=0, batch_size=1) #这里输出一维 self.recodes.append([ID,time.time(),'modelFacial',predFacial]) resultFacial = namesFacial[1 if float(predFacial)>0.5 else 0] print(ID, result, resultFacial, float(predFacial)) self.sendResult[ID]=resultFacial else: print(ID, result, pred) self.sendResult[ID]=result del frame self.dealTimes[ID] += 1 if self.stop: print("处理",num,"线程结束") break else: # 队列空 休息会 time.sleep(0.97) # with sess.as_default(): # with graph.as_default(): # tt = time.time() # bs = 48 # 32对应0.64s 48对应0.95s 24对应0.48s # model.predict(np.zeros((bs,frameSize[0],frameSize[1],3)), verbose=0, batch_size=bs) # modelFacial.predict(np.zeros((bs,frameSize[0],frameSize[1],3)), verbose=0, batch_size=bs) # print('处理A进程堕态',time.time()-tt) def uploadData(self): print("数据上传线程开始") import shutil,os,time from bypy import ByPy while True: if self.stop: break fileList = os.listdir(self.picSaveFolder) if len(fileList)>200: nowTime = time.strftime('%m%d %H%M') newPath = r'C:\Users\Administrator\Desktop\tmp-'+nowTime # 新建临时文件夹 if not os.path.exists(newPath): os.mkdir(newPath) # 清空临时文件夹已有文件 for file in os.listdir(newPath): os.remove(os.path.join(newPath,file)) # 复制数据 for file in fileList: shutil.copy(os.path.join(self.picSaveFolder,file), os.path.join(newPath,file)) # 上传 bp=ByPy() if(bp.upload(newPath,r'/cloud/摄像头数据/'+nowTime+r'/')==60): # 上传文件夹 print('文件上传成功') # 删除已上传文件 for file in fileList: os.remove(os.path.join(newPath,file)) for file in fileList: os.remove(os.path.join(self.picSaveFolder,file)) # 删除临时文件夹 os.removedirs(newPath) else: print('备份上传失败,临时文件夹请见',newPath) else: if self.stop: break time.sleep(60*30) if self.stop: break def send(self): print("信息发送线程开始") while True: for ID in self.onLineID: if self.stop: break if ID=='ID': continue # result=[classes[np.argmax(self.countDict[ID])]] if not np.max(self.countDict[ID])<=0 else [] # 如果三秒内某一标签最大出现次数大于两次,则计入出现次数最多的标签,作为结果输出,否则为空标签 if self.dealTimes[ID]>0: # print(ID,self.countDict[ID],result,self.dealTimes[ID]) try: with open(os.path.join(self.infoUrlPath,ID+".html"),'w',encoding='utf8') as f: # f.write(str(result)) f.write('['+self.sendResult[ID]+']') # print("update label html ok") except PermissionError as ee: print(ee) except Exception as e: print("ERROR while saving html",e) # self.countDict[ID] = np.zeros(8) self.dealTimes[ID] = 0 if self.stop: print("信息发送线程结束") break time.sleep(1/dealFps) def Run(self): # 多线程处理 self.stop = False self.threads.append(threading.Thread(target=Live.server, args=(self,))) #接收线程 if self.saveOnly: self.threads.append(threading.Thread(target=Live.uploadData, args=(self,))) #上传线程 else: self.threads.append(threading.Thread(target=Live.send, args=(self,))) #发送标签 for i in range(5): self.threads.append(threading.Thread(target=Live.deal, args=(self,str(i)))) #模型处理 # 打开线程 [thread.setDaemon(True) for thread in self.threads] [thread.start() for thread in self.threads] #多线程# =============================================================================# self.threads = [# Process(target=Live.deal, args=(self,'0')),# Process(target=Live.deal, args=(self,'1')),# Process(target=Live.deal, args=(self,'2')),# Process(target=Live.deal, args=(self,'3')),# Process(target=Live.send, args=(self,)),# Process(target=Live.server, args=(self,)),# ]# [thread.start() for thread in self.threads] #queue.Queue不能用于多进程# ============================================================================= def Stop(self): self.stop = True time.sleep(11) while not self.frame_queueA.empty(): self.frame_queueA.get() self.count = np.zeros(8) with open(os.path.join(self.logDir,"runningInf.log"),'a+',encoding='utf8') as f: f.write(str(datetime.datetime.strftime(datetime.datetime.now(),'%m %d-%H:%M:%S'))+' 服务停止\n') f.write("ID&IP"+str(self.ipDict)+'\n') f.write("注册时间"+str(self.timeDict)+'\n') f.write("关闭时间"+str(self.lastTimeDict)+'\n') f.write("log end \n") print('日志写入完成') if not self.saveOnly: logSavePath = os.path.join(self.logDir,'recodes-'+time.strftime('%m%d-%H%M%S')+'.pickle') with open(logSavePath, "wb") as f: f.write(pickle.dumps(self.recodes)) print('详细日志写入完成') print("停止成功") L = Live()def checkTime(): if(not L.frame_queueA.empty()): tttt = datetime.datetime.strftime(datetime.datetime.now(),'%H:%M:%S') ID,frame = L.frame_queueA.get() print(tttt,ID) frame = cv.resize(cv.cvtColor(frame, cv.COLOR_BGR2RGB), (frameSize[0],frameSize[1])) showImg(frame) else: print("队列空") time.sleep(0.4) runCheck() def runCheck(): t = threading.Thread(target=checkTime, args=()) t.setDaemon(True) t.start() # L.Stop()# 在CMD中运行def pretect(): try: while True: pass except Exception as e: print(e) finally: L.Stop()L.Run()# pretect()

转载地址:http://yrzti.baihongyu.com/

你可能感兴趣的文章
C 语言 学习---回调、时间定时更新程序
查看>>
C 语言 学习---复选框及列表框的使用
查看>>
第十一章 - 直接内存
查看>>
JDBC核心技术 - 上篇
查看>>
一篇搞懂Java反射机制
查看>>
Single Number II --出现一次的数(重)
查看>>
Palindrome Partitioning --回文切割 深搜(重重)
查看>>
对话周鸿袆:从程序员创业谈起
查看>>
Mysql中下划线问题
查看>>
Xcode 11 报错,提示libstdc++.6 缺失,解决方案
查看>>
idea的安装以及简单使用
查看>>
Windows mysql 安装
查看>>
python循环语句与C语言的区别
查看>>
vue 项目中图片选择路径位置static 或 assets区别
查看>>
vue项目打包后无法运行报错空白页面
查看>>
Vue 解决部署到服务器后或者build之后Element UI图标不显示问题(404错误)
查看>>
element-ui全局自定义主题
查看>>
facebook库runtime.js
查看>>
vue2.* 中 使用socket.io
查看>>
openlayers安装引用
查看>>