Terug

Emotie detectie

2024

Project

In dit project heb ik Python en Arduino gecombineerd om een ​​interactief systeem te creëren dat reageert op menselijke uitdrukkingen. Met behulp van Python analyseerde ik gezichtsuitdrukkingen in realtime en detecteerde wanneer iemand glimlachte of lachte. Deze gegevens werden vervolgens als commando naar de Arduino gestuurd, die als reactie een waterpomp activeerde.


Uitleg code:

Bibliotheken: Je moet de volgende Python-bibliotheken installeren. Dit kan meestal met pip, de package manager van Python. Open je terminal of command prompt en voer de volgende commando's uit:

pip install opencv-python
pip install face_recognition
pip install fer
  1. Importeren van bibliotheken
import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue

Hier importeren we alle benodigde bibliotheken.

  1. Initialiseren van variabelen
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
timeout = 600  # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 1

known_face_encodings: Een lijst om gezichtscoderingen van bekende gezichten op te slaan.

known_face_ids: Een lijst om ID's van bekende gezichten op te slaan.

next_id: Een teller voor het genereren van nieuwe ID's.

last_seen: Een dictionary om de tijd bij te houden dat elk gezicht voor het laatst is gezien.

timeout: De tijd (in seconden) waarna een gezicht wordt "vergeten".

face_detection_interval: Bepaalt hoe vaak de code naar nieuwe gezichten zoekt, hoe hoger de waarde hoe minder vaak de code gezichten herkent.

emotion_detection_interval: Bepaalt hoe vaak de code naar emoties zoekt.

  1. Initialiseren van de emotiedetector en video capture
emotion_detector = FER(mtcnn=True)
video_capture = cv2.VideoCapture(0)
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

FER(mtcnn=True): Initialiseert de Face Emotion Recognizer met MTCNN voor gezichtsdetectie.

cv2.VideoCapture(0): Start de webcam (0 is de standaard webcam).

video_capture.set(): Stelt de resolutie van de video in.

  1. Afdrukken van Startbericht
print("Programma gestart. Druk op 'q' om te stoppen.")

Druk een bericht af dat het programma gestart is

  1. Queue initialisatie en frame count
frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)

frame_count: Houdt bij welk frame verwerkt wordt om bewerkingen om de zoveel frames uit te voeren

face_queue: Een queue (wachtrij) om de gedetecteerde gezichten en coderingen door te geven aan de main thread.

emotion_queue: Een queue om de gedetecteerde emoties door te geven aan de main thread.

  1. Process Face functie
def process_faces(frame):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_locations = face_recognition.face_locations(rgb_frame, model="hog")
    face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
    face_queue.put((face_locations, face_encodings))

Deze functie wordt op een aparte thread uitgevoerd en detecteert gezichten in een frame.

De resultaten (locaties en coderingen) worden in face_queue geplaatst.

  1. Process Emotion functie
def process_emotions(frame, face_location):
    top, right, bottom, left = face_location
    face_image = frame[top:bottom, left:right]
    emotions = emotion_detector.detect_emotions(face_image)
    emotion_queue.put((face_location, emotions))

Deze functie, ook op een aparte thread, detecteert emoties in een gezichtsgebied.

De resultaten (locatie en emoties) worden in emotion_queue geplaatst.

  1. Main loop
try:
    while True:
        ret, frame = video_capture.read()
        if not ret:
            print("Kan geen frame van de camera lezen. Opnieuw proberen...")
            time.sleep(1)
            continue

        frame_count += 1
        current_time = time.time()

        if frame_count % face_detection_interval == 0:
            Thread(target=process_faces, args=(frame.copy(),)).start()

        if not face_queue.empty():
            face_locations, face_encodings = face_queue.get()
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                face_id = "Onbekend"

                if True in matches:
                    first_match_index = matches.index(True)
                    face_id = known_face_ids[first_match_index]
                else:
                    face_id = f"Persoon {next_id}"
                    known_face_encodings.append(face_encoding)
                    known_face_ids.append(face_id)
                    next_id += 1

                last_seen[face_id] = current_time

                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                if frame_count % emotion_detection_interval == 0:
                    Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()

        if not emotion_queue.empty():
            face_location, emotions = emotion_queue.get()
            if emotions:
                top, right, bottom, left = face_location
                dominant_emotion = max(emotions[0]['emotions'], key=emotions[0]['emotions'].get)
                cv2.putText(frame, f"Emotion: {dominant_emotion}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
        for face_id in list(last_seen.keys()):
            if current_time - last_seen[face_id] > timeout:
                if face_id in known_face_ids:
                    index = known_face_ids.index(face_id)
                    known_face_encodings.pop(index)
                    known_face_ids.pop(index)
                del last_seen[face_id]
                print(f"Gezicht met ID {face_id} verwijderd wegens inactiviteit.")
                
        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

video_capture.read(): Leest een frame van de webcam.

Face detectie thread: Als het interval is bereikt, start een thread om gezichten te detecteren

Face handling: Haalt gezichten uit de queue. Vergelijkt met bekende gezichten of maakt een nieuw gezicht aan. Tekent een rechthoek om het gezicht en plaatst de ID.

Emotion detectie thread: Als het interval is bereikt, start een thread om de emoties op het gezicht te detecteren

Emotion handling: Haalt de locatie van het gezicht en de bijbehorende emotie uit de queue, en tekent de emotie op het scherm.

Inactiviteitscontrole: Verwijdert gezichten die te lang niet zijn gezien.

cv2.imshow(): Toont de video met herkenning en emoties.

cv2.waitKey(): Wacht op een toetsaanslag. Stopt de loop als 'q' wordt ingedrukt.

  1. Afsluitprocedures
except KeyboardInterrupt:
    print("Programma onderbroken door gebruiker.")

finally:
    video_capture.release()
    cv2.destroyAllWindows()
    print("Programma beëindigd.")

KeyboardInterrupt: Vangt een toetsenbord onderbreking op (bijv. Ctrl+C).

finally:: Sluit de webcam en de vensters af, zelfs als er een fout optreedt.


Complete code:

import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue

# Initialize variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
timeout = 600  # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 1 # Bepaal hoe vaak die het scherm checkt, interval checkt om de 3 frames. Lager interval is meer intensief 

# Initialize emotion detector
emotion_detector = FER(mtcnn=True)

# Video capture
video_capture = cv2.VideoCapture(0) # 0 staat voor de interne camera, switch naar 1 voor de eerste externe camera. // cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) gebruil dit als de externe camera niet werkt.
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #Beeld grote bepalen

print("Programma gestart. Druk op 'q' om te stoppen.")

frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)

def process_faces(frame): # Stelt indentiteit van de persoon vast.
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_locations = face_recognition.face_locations(rgb_frame, model="hog")
    face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
    face_queue.put((face_locations, face_encodings))

def process_emotions(frame, face_location): # Check de emoties die op het gezicht wordt vertooont 
    top, right, bottom, left = face_location
    face_image = frame[top:bottom, left:right]
    emotions = emotion_detector.detect_emotions(face_image)
    emotion_queue.put((face_location, emotions)) # Correnspondeert met een wachtrij

try:
    while True:
        ret, frame = video_capture.read()
        if not ret:
            print("Kan geen frame van de camera lezen. Opnieuw proberen...")
            time.sleep(1)
            continue

        frame_count += 1
        current_time = time.time()

        if frame_count % face_detection_interval == 0:
            Thread(target=process_faces, args=(frame.copy(),)).start()

        if not face_queue.empty():
            face_locations, face_encodings = face_queue.get()
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                face_id = "Onbekend"

                if True in matches:
                    first_match_index = matches.index(True)
                    face_id = known_face_ids[first_match_index]
                else:
                    face_id = f"Persoon {next_id}"
                    known_face_encodings.append(face_encoding)
                    known_face_ids.append(face_id)
                    next_id += 1

                last_seen[face_id] = current_time

                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                if frame_count % emotion_detection_interval == 0:
                    Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()

        if not emotion_queue.empty():
            face_location, emotions = emotion_queue.get()
            if emotions:
                top, right, bottom, left = face_location
                dominant_emotion = max(emotions[0]['emotions'], key=emotions[0]['emotions'].get)
                cv2.putText(frame, f"Emotion: {dominant_emotion}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

        for face_id in list(last_seen.keys()):
            if current_time - last_seen[face_id] > timeout:
                if face_id in known_face_ids:
                    index = known_face_ids.index(face_id)
                    known_face_encodings.pop(index)
                    known_face_ids.pop(index)
                del last_seen[face_id]
                print(f"Gezicht met ID {face_id} verwijderd wegens inactiviteit.")

        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except KeyboardInterrupt:
    print("Programma onderbroken door gebruiker.")

finally:
    video_capture.release()
    cv2.destroyAllWindows()
    print("Programma beëindigd.")

Codeface

Code met arduino:

Hieronder een vergelijking van de code met Arduino-integratie versus de code zonder, met de nadruk op de veranderingen in elke sectie:

  1. Arduino Serial Communicatie:

    import serial: De serial bibliotheek is geïmporteerd om communicatie met de Arduino mogelijk te maken.

    arduino = serial.Serial('COM5', 9600): Initialiseert een seriële verbinding met de Arduino op de opgegeven COM-poort (hier COM5) en met een baudrate van 9600. Je moet de COM-poort aanpassen aan je eigen setup.

    time.sleep(2): Wacht 2 seconden om de seriële verbinding te laten opzetten.

    arduino.write(b'WATER_ON') en arduino.write(b'WATER_OFF'): Stuurt commando's naar de Arduino (in bytes) om de waterpomp aan en uit te schakelen.

    arduino.close(): Sluit de seriële verbinding aan het einde van het programma.

  2. Waterpomp Functionaliteit:

    cooldown_time = 120: Een variabele die de minimale tijd (in seconden) tussen de waterpompactivaties bepaalt.

    waterpump_active = False: Een vlag om aan te geven of de waterpomp momenteel actief is.

    last_happy_times = : Een dictionary om de laatste tijd bij te houden dat de waterpomp geactiveerd is voor een bepaald gezicht.

    activate_waterpump(person_id) functie: Activeert de waterpomp voor 8 seconden, na een 'happy' detectie voor een persoon. En wordt op een aparte thread uitgevoerd om de video bewerking niet te blokkeren.

  3. Emotie Detectie en Waterpomp Activering:

    Emotie Analyse: De code kijkt nu naar de top 3 emoties en toont deze op het scherm.

    "Happy" Trigger: Als de detectie van happy boven een drempelwaarde (0.5) komt, en de cooldown timer is afgelopen, dan wordt de waterpomp geactiveerd via de activate_waterpump functie.

  4. Externe Camera Fix

    De code maakt nu gebruik van cv2.VideoCapture(1, cv2.CAP_DSHOW) om de externe camera te starten en controleerd of de camera wel is verbonden.

  5. Debugger info

    De code geeft nu in de terminal de gedetecteerde emoties uit.

Uitleg in code per veranderingen:

  1. Importeer de serial bibliotheek:
import serial

Nieuw: Deze bibliotheek wordt toegevoegd om seriële communicatie met de Arduino mogelijk te maken.

  1. Arduino Setup:
arduino = serial.Serial('COM5', 9600)
time.sleep(2)

Nieuw:

serial.Serial('COM5', 9600): Initialiseert de seriële communicatie met de Arduino op de opgegeven COM-poort ('COM5' in dit voorbeeld) en een baudrate van 9600.

time.sleep(2): Wacht 2 seconden zodat de seriële verbinding stabiel tot stand kan komen.

  1. Gewijzigde variabelen initialisatie:
last_happy_times = {}
cooldown_time = 120
waterpump_active = False
emotion_detection_interval = 3

Nieuw:

last_happy_times: Een dictionary die de tijd van de laatste waterpompactivatie opslaat per persoon.

cooldown_time: Een variabele die de cooldown periode in seconden vaststelt.

waterpump_active: Een vlag die checkt of de waterpomp al geactiveerd is.

Gewijzigd:

emotion_detection_interval = 3: Is gewijzigd van 1 naar 3.

  1. Video Capture - Externe Camera:
     
video_capture = cv2.VideoCapture(1, cv2.CAP_DSHOW)
if not video_capture.isOpened():
    print("Kan de externe camera niet openen. Controleer of de camera is aangesloten.")
    exit()

Gewijzigd:

cv2.VideoCapture(1, cv2.CAP_DSHOW): Selecteert de externe camera.

De code controleert nu of de camera is aangesloten en de code afsluiten als dit niet het geval is.

  1. activate_waterpump Functie:
      
def activate_waterpump(person_id):
    global waterpump_active
    waterpump_active = True
    print(f"Blijdschap gedetecteerd voor {person_id}, waterpomp geactiveerd voor 8 seconden.")
    arduino.write(b'WATER_ON')
    time.sleep(8)
    arduino.write(b'WATER_OFF')
    print(f"Waterpomp uitgeschakeld voor {person_id}.")
    waterpump_active = False
   

Nieuw: Deze functie wordt aangeroepen om de waterpomp te activeren:

global waterpump_active: Gebruikt de global vlag

waterpump_active = True: Zet de vlag naar True.

arduino.write(b'WATER_ON'): Stuurt een signaal naar de arduino om de waterpomp aan te zetten.

time.sleep(8): De waterpomp blijft voor 8 seconden aan.

arduino.write(b'WATER_OFF'): Stuurt een signaal naar de arduino om de waterpomp uit te zetten.

waterpump_active = False: Zet de flag naar False.

  1. Wijzigingen in de Main Loop (Emotie Handling en Pomp Activatie):
      
if not emotion_queue.empty():
    face_location, emotions = emotion_queue.get()
    if emotions:
        top, right, bottom, left = face_location
        sorted_emotions = sorted(emotions[0]['emotions'].items(), key=lambda x: x[1], reverse=True)

        # Toon de top 3 emoties
        emotion_text = " | ".join([f"{emotion}: {conf:.2f}" for emotion, conf in sorted_emotions[:3]])
        cv2.putText(frame, emotion_text, (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        
        # Debug informatie
        print(f"Gedetecteerde emoties: {sorted_emotions}")
        
        # Controleer op 'happy' emotie met lagere drempelwaarde
        if sorted_emotions[0][0] == 'happy' and sorted_emotions[0][1] > 0.5:
            face_id = next((id for id, loc in last_seen.items() if loc == current_time), None)
            if face_id:
                last_happy_time = last_happy_times.get(face_id, 0)
                if (current_time - last_happy_time) > cooldown_time:
                    if not waterpump_active:
                        last_happy_times[face_id] = current_time
                        Thread(target=activate_waterpump, args=(face_id,)).start()

Gewijzigd en Nieuw:

Top Emotie Weergave: Toont nu de top 3 emoties met bijbehorende waarden op het scherm.

Debug Output: Print gedetecteerde emoties naar de console.

Happy Emotie Drempel:

Checkt of de dominantste emotie 'happy' is met een drempel van 0.5.

Pomp Activatielogica:

Controleert of er een gezicht is gekoppeld aan de huidige tijd.

Kijkt of de cooldowntimer is afgelopen.

Kijkt of de waterpomp al actief is

Als dit het geval is wordt de activate_waterpump functie in een nieuwe thread gestart.

  1. Afsluitprocedure finally block:
      
finally:
    video_capture.release()
    cv2.destroyAllWindows()
    arduino.close()
    print("Programma beëindigd.")

Gewijzigd: arduino.close(): Sluit de seriële communicatie met de Arduino netjes af.


Complete code:


import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue
import serial

# Arduino setup
arduino = serial.Serial('COM5', 9600)  # Pas de COM-poort aan indien nodig
time.sleep(2)  # Wacht tot de verbinding is opgezet

# Initialize variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
last_happy_times = {}
timeout = 600  # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 3
cooldown_time = 120  # Cooldown tijd voor waterpomp in seconden
waterpump_active = False

# Initialize emotion detector
emotion_detector = FER(mtcnn=True)

# Video capture voor externe camera
video_capture = cv2.VideoCapture(1, cv2.CAP_DSHOW)  # Gebruik 0 voor de eerste externe camera, of 1 als dat niet werkt
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

if not video_capture.isOpened():
    print("Kan de externe camera niet openen. Controleer of de camera is aangesloten.")
    exit()

print("Programma gestart. Druk op 'q' om te stoppen.")

frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)

def process_faces(frame):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_locations = face_recognition.face_locations(rgb_frame, model="hog")
    face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
    face_queue.put((face_locations, face_encodings))

def process_emotions(frame, face_location):
    top, right, bottom, left = face_location
    face_image = frame[top:bottom, left:right]
    emotions = emotion_detector.detect_emotions(face_image)
    emotion_queue.put((face_location, emotions))

def activate_waterpump(person_id):
    global waterpump_active
    waterpump_active = True
    print(f"Blijdschap gedetecteerd voor {person_id}, waterpomp geactiveerd voor 8 seconden.")
    arduino.write(b'WATER_ON')
    time.sleep(8)
    arduino.write(b'WATER_OFF')
    print(f"Waterpomp uitgeschakeld voor {person_id}.")
    waterpump_active = False

try:
    while True:
        ret, frame = video_capture.read()
        if not ret:
            print("Kan geen frame van de camera lezen. Opnieuw proberen...")
            time.sleep(1)
            continue

        frame_count += 1
        current_time = time.time()

        if frame_count % face_detection_interval == 0:
            Thread(target=process_faces, args=(frame.copy(),)).start()

        if not face_queue.empty():
            face_locations, face_encodings = face_queue.get()
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                face_id = "Onbekend"

                if True in matches:
                    first_match_index = matches.index(True)
                    face_id = known_face_ids[first_match_index]
                else:
                    face_id = f"Persoon {next_id}"
                    known_face_encodings.append(face_encoding)
                    known_face_ids.append(face_id)
                    next_id += 1

                last_seen[face_id] = current_time

                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                if frame_count % emotion_detection_interval == 0:
                    Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()

        if not emotion_queue.empty():
            face_location, emotions = emotion_queue.get()
            if emotions:
                top, right, bottom, left = face_location
                sorted_emotions = sorted(emotions[0]['emotions'].items(), key=lambda x: x[1], reverse=True)
                
                # Toon de top 3 emoties
                emotion_text = " | ".join([f"{emotion}: {conf:.2f}" for emotion, conf in sorted_emotions[:3]])
                cv2.putText(frame, emotion_text, (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                
                # Debug informatie
                print(f"Gedetecteerde emoties: {sorted_emotions}")
                
                # Controleer op 'happy' emotie met lagere drempelwaarde
                if sorted_emotions[0][0] == 'happy' and sorted_emotions[0][1] > 0.5:
                    face_id = next((id for id, loc in last_seen.items() if loc == current_time), None)
                    if face_id:
                        last_happy_time = last_happy_times.get(face_id, 0)
                        if (current_time - last_happy_time) > cooldown_time:
                            if not waterpump_active:
                                last_happy_times[face_id] = current_time
                                Thread(target=activate_waterpump, args=(face_id,)).start()

        for face_id in list(last_seen.keys()):
            if current_time - last_seen[face_id] > timeout:
                if face_id in known_face_ids:
                    index = known_face_ids.index(face_id)
                    known_face_encodings.pop(index)
                    known_face_ids.pop(index)
                del last_seen[face_id]
                print(f"Gezicht met ID {face_id} verwijderd wegens inactiviteit.")

        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except KeyboardInterrupt:
    print("Programma onderbroken door gebruiker.")

finally:
    video_capture.release()
    cv2.destroyAllWindows()
    arduino.close()
    print("Programma beëindigd.")

Arduino code:

const int relayPin = 5;  // Pin waarop het relais is aangesloten

void setup() {
  pinMode(relayPin, OUTPUT);
  Serial.begin(9600);  // Start seriële communicatie
}

void loop() {
  if (Serial.available() > 0) {
    String command = Serial.readStringUntil('\n');  // Lees het commando van de seriële poort

    if (command == "WATER_ON") {
      digitalWrite(relayPin, HIGH);  // Zet de pomp aan
    } 
    else if (command == "WATER_OFF") {
      digitalWrite(relayPin, LOW);   // Zet de pomp uit
    }
  }
}