package websocket import ( "encoding/json" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" "sensor-server/internal/models" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // 개발 환경에서는 모든 origin 허용 }, } // Client WebSocket 클라이언트 정보 type Client struct { ID string Conn *websocket.Conn Send chan []byte Hub *Hub mu sync.Mutex } // Hub WebSocket 허브 (연결 관리) type Hub struct { clients map[*Client]bool broadcast chan []byte register chan *Client unregister chan *Client mu sync.RWMutex } // NewHub 새로운 허브 생성 func NewHub() *Hub { return &Hub{ clients: make(map[*Client]bool), broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), } } // Run 허브 실행 func (h *Hub) Run() { for { select { case client := <-h.register: h.mu.Lock() h.clients[client] = true h.mu.Unlock() log.Printf("Client %s connected. Total clients: %d", client.ID, len(h.clients)) case client := <-h.unregister: h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.Send) } h.mu.Unlock() log.Printf("Client %s disconnected. Total clients: %d", client.ID, len(h.clients)) case message := <-h.broadcast: h.mu.RLock() for client := range h.clients { select { case client.Send <- message: default: close(client.Send) delete(h.clients, client) } } h.mu.RUnlock() } } } // BroadcastSensorData 센서 데이터 브로드캐스트 func (h *Hub) BroadcastSensorData(reading *models.SensorReading) { // 클라이언트가 기대하는 WebSocketMessage 형식으로 변환 message := map[string]interface{}{ "type": "sensor_data", "data": reading, "timestamp": time.Now().Format(time.RFC3339), "device_id": reading.DeviceID, } data, err := json.Marshal(message) if err != nil { log.Printf("Error marshaling sensor data: %v", err) return } log.Printf("Broadcasting sensor data to %d clients: %s", len(h.clients), string(data)) h.broadcast <- data } // readPump 클라이언트로부터 메시지 읽기 func (c *Client) readPump() { defer func() { c.Hub.unregister <- c c.Conn.Close() }() c.Conn.SetReadLimit(512) c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket read error: %v", err) } break } // 클라이언트로부터 받은 메시지 처리 (필요시) log.Printf("Received message from client %s: %s", c.ID, string(message)) } } // writePump 클라이언트로 메시지 쓰기 func (c *Client) writePump() { ticker := time.NewTicker(54 * time.Second) defer func() { ticker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) if err := w.Close(); err != nil { return } case <-ticker.C: c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // ServeWs WebSocket 핸들러 func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) return } client := &Client{ ID: generateClientID(), Hub: hub, Conn: conn, Send: make(chan []byte, 256), } client.Hub.register <- client go client.writePump() go client.readPump() } // generateClientID 클라이언트 ID 생성 func generateClientID() string { return time.Now().Format("20060102150405") + "-" + randomString(8) } // randomString 랜덤 문자열 생성 func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" b := make([]byte, length) for i := range b { b[i] = charset[time.Now().UnixNano()%int64(len(charset))] } return string(b) }