Kaspersky Security Center OpenAPI を使用したKaspersky Endpoint Detection and Response Optimum の操作

2024年2月13日

ID 231480

Kaspersky Endpoint Detection and Response OptimumがKaspersky Security Center Web Console を使用してローカルに展開されている場合、Kaspersky Security Center OpenAPI を使用して、自動化を構成し、一部の操作シナリオとタスクをカスタマイズできます。

OpenAPI の操作の詳細は、 Kaspersky Security Center のヘルプを参照してください。

以下の例は、Kaspersky Security Center OpenAPI を使用してコンピューターをネットワークから分離するスクリプトを示しています。

スクリプトの例:

# #!/usr/bin/python -tt

# -*- coding: utf-8 -*-

 

import sys

import os

import argparse

import socket

import time

import getpass

import urllib3

from sys import platform

from urllib.parse import urlparse

from KlAkOAPI.Params import KlAkParams, KlAkArray, paramParams, strToBin

from KlAkOAPI.AdmServer import KlAkAdmServer

from KlAkOAPI.Error import KlAkError, KlAkResponseError

from KlAkOAPI.CgwHelper import KlAkCgwHelper

from KlAkOAPI.GatewayConnection import KlAkGatewayConnection

from KlAkOAPI.HostGroup import KlAkHostGroup

from KlAkOAPI.ChunkAccessor import KlAkChunkAccessor

from KlAkOAPI.Tasks import KlAkTasks

from KlAkOAPI.HostTasks import KlAkHostTasks

from KlAkOAPI.NagHstCtl import KlAkNagHstCtl

 

# For basic authentication, you should either state '-user' and '-password' arguments or the following credentials must be applied: KSCServerUserAccountDefault / KSCServerUserPasswordDefault

# You should create an internal user with these credentials in advance in Kaspersky Security Center by using Kaspersky Security Center Web Console or MMC-based Administration Console and grant this user required privileges, such as 'Main Administrator' role.

# For Windows platform, NTLM authentication is applied when '-user' and '-password' arguments are omitted.

KSCServerUserAccountDefault = 'klakoapi_test'

KSCServerUserPasswordDefault = 'testpassword'

 

 

def GetHostNameByHostFQDN(server, strHostFQDN):

""" Find (internal) host name by host display name; the returned wsHostName is required for gateway connection to Network Agent """

hostGroup = KlAkHostGroup(server)

hostInfo = hostGroup.FindHosts('(KLHST_WKS_FQDN="' + strHostFQDN + '")', ['KLHST_WKS_HOSTNAME', 'KLHST_WKS_DN', 'name'], [], {'KLGRP_FIND_FROM_CUR_VS_ONLY': True}, 100)

strAccessor = hostInfo.OutPar('strAccessor')

   

# get search result (in case of ambiguity first found host is taken)

chunkAccessor = KlAkChunkAccessor (server)

items_count = chunkAccessor.GetItemsCount(strAccessor).RetVal()

if items_count < 1:

raise KlAkError('no gateway host found by name ' + strHostFQDN)

res_chunk = chunkAccessor.GetItemsChunk(strAccessor, 0, 1)

res_array = KlAkParams(res_chunk.OutPar('pChunk'))['KLCSP_ITERATOR_ARRAY']

res_host = res_array[0]

wsHostName = res_host['KLHST_WKS_HOSTNAME']

 

print('Host for Network Agent gateway connection is:', strHostFQDN, ' that corresponds to device', res_host['KLHST_WKS_DN'], 'in group', res_host['name'])

      

return wsHostName

   

def GetServer(server_url):

"""Connection to Kaspersky Security Center Administration Server"""

# Connection to Administration Server installed on the current device, use the default port

if platform == "win32":

username = None # for Windows, use NTLM by default

password = None

else:

username = 'klakoapi_test' # for other platform, use basic authentication, user should be created on Kaspersky Security Center Administration Server in advance

password = 'testpassword'

   

SSLVerifyCert = 'C:\\ProgramData\\KasperskyLab\\adminkit\\1093\\cert\\klserver.cer'

 

# create Administration Server object

server = KlAkAdmServer.Create(server_url, username, password, verify = SSLVerifyCert)

   

return server

    

def PrepareNagentGatewayConnection(server_main, server_parent_on_hierarchy = None, wsHostName = '', arrLocation = []):

""" Prepares token for gateway connection to Network Agent; see 'Creating gateway connections' section in Kaspersky Security Center OpenAPI documentation

arrLocation is updated with Network Agent location, can be used in creating chain of locations down by hierarchy """

if wsHostName == '':

raise Exception('no hosts found on Administration Server, nothing to demonstrate as Network Agent gateway connection')

  

if server_parent_on_hierarchy == None:

server_parent_on_hierarchy = server_main

    

# step 1: get Network Agent location

cgwHelper = KlAkCgwHelper(server_parent_on_hierarchy)

nagentLocation = cgwHelper.GetNagentLocation(wsHostName).RetVal()

    

# step 2: build locations list

arrLocation.append(paramParams(nagentLocation))

   

# step 3: prepare gateway connection to main server with locations array built on previous step

gatewayConnection = KlAkGatewayConnection(server_main)

response = gatewayConnection.PrepareGatewayConnection(arrLocation)

token_on_nagent = response.OutPar('wstrAuthKey')

   

# use token for further gateway connection to Network Agent

return token_on_nagent

 

def ConnectNagentGatewayAuth(server_url, gw_token):

""" Connect Network Agent by using gateway connection.

Token for gateway connection is prepared with PrepareNagentGatewayConnection(...) """

print ('Main Kaspersky Security Center Administration Server address:', server_url)

   

server = KlAkAdmServer.CreateGateway(server_url, gw_token, False)

   

if server.connected:

print ('Network Agent is connected successfully.')

else:

print ('Failed to connect Network Agent.')

       

return server

 

def CreteGatewayToHost(server, server_url, hostId):

      

# prepare token for gateway authentication

print ('-- Prepare Network Agent gateway connection --')

token = PrepareNagentGatewayConnection(server, wsHostName = hostId) # prepare token to connect to Network Agent on the current device; or use the device FQDN here

server = ConnectNagentGatewayAuth(server_url, token)

 

return server

   

def FindHostTask(server, hostId, strDisplayName):

print('Searching for a task for the host', strDisplayName)

 

oHostGroup = KlAkHostGroup(server)

strSrvObjId = oHostGroup.GetHostTasks(hostId).RetVal()

   

oHostTasks = KlAkHostTasks(server)

 

oHostTasks.ResetTasksIterator(strSrvObjId, '', '', '', '', '')

pTaskData = None

while True:

pTaskData = oHostTasks.GetNextTask(strSrvObjId).OutPar('pTaskData')

       

if pTaskData == None or len(pTaskData) == 0:

break

           

strDN = pTaskData['TASK_INFO_PARAMS']['DisplayName']

if strDN == strDisplayName:

print('Task ' + strDN + ' found')

break

                       

return pTaskData

   

def StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion):

nagHstCtl.SendTaskAction(strProductName, strProductVersion, taskTsId, 5)

   

def WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion):

pFilter = KlAkParams({})

pFilter.AddParams('klhst-rt-TskInfo', {'klhst-ProductName':strProductName})

 

n = 0

bDone = False

while n < 20 or bDone:

info = nagHstCtl.GetHostRuntimeInfo(pFilter).RetVal()['klhst-rt-TskInfo'][strProductName]

 

bFoundInfo = False

for taskinfo in KlAkArray(info['klhst-rt-TskArray']):

if taskinfo['taskStorageId'] == taskTsId:

print('Current task state:', taskinfo['taskState'])

               

bFoundInfo = True

               

if taskinfo['taskState'] != 1: # Not running

print('Task is complete')

bDone = True

break

                   

if not bFoundInfo:

print('Task is complete')

break

           

time.sleep(1)

n += 1

 

def main():

 

server_address = socket.getfqdn()

server_port = 13299

server_url = 'https://' + server_address + ':' + str(server_port)

 

# names of tasks related to isolation.Can be found in device properties in Kaspersky Security Center Web Console.

strIsolate = 'Network isolation'

strUnblock = 'Termination of network isolation'

hostaddress = 'some-host.lab.local'

 

#Connecting to Kaspersky Security Center

server = GetServer(server_url)

 

if hostaddress is None:

hostaddress = socket.getfqdn()

   

hostId = GetHostNameByHostFQDN(server, hostaddress)

   

pTaskData = FindHostTask(server, hostId, strIsolate)

# to remove isolation run

# pTaskData = FindHostTask(server, hostId, strUnblock)

 

taskTsId = pTaskData['TASK_UNIQUE_ID']

strProductName = pTaskData['TASKID_PRODUCT_NAME']

strProductVersion = pTaskData['TASKID_VERSION']

   

nagent = CreteGatewayToHost(server, server_url, hostId)

nagHstCtl = KlAkNagHstCtl(nagent)

 

StartHostTask(nagHstCtl, taskTsId, strProductName, strProductVersion)

WaitTaskCompletion(nagHstCtl, taskTsId, strProductName, strProductVersion)

 

if __name__ == '__main__':

main()

この記事はお役に立ちましたか?
改善できる点がありましたらお聞かせください。
フィードバックをいただき、ありがとうございました。改善に向けて取り組んでまいります。
フィードバックをいただき、ありがとうございました。改善に向けて取り組んでまいります。