open-toontown/otp/ai/AIRepository.py

1917 lines
75 KiB
Python

from pandac.PandaModules import *
from otp.otpbase import OTPGlobals
from .AIMsgTypes import *
from direct.showbase.PythonUtil import Functor
from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.fsm import ClassicFSM
from direct.fsm import State
from direct.task import Task
from direct.distributed.AsyncRequest import cleanupAsyncRequests
from direct.distributed import ParentMgr
from direct.distributed.ConnectionRepository import ConnectionRepository
import sys
import os
import copy
import types
from direct.distributed.PyDatagram import PyDatagram
from direct.distributed.PyDatagramIterator import PyDatagramIterator
from direct.distributed.NetMessenger import NetMessenger
from direct.showbase.ContainerLeakDetector import ContainerLeakDetector
from direct.showbase import MessengerLeakDetector
from direct.showbase import LeakDetectors
from direct.showbase.GarbageReportScheduler import GarbageReportScheduler
from otp.avatar.DistributedPlayerAI import DistributedPlayerAI
from otp.distributed import OtpDoGlobals
from otp.ai.GarbageLeakServerEventAggregatorAI import GarbageLeakServerEventAggregatorAI
import time
import gc
class AIRepository(ConnectionRepository):
"""
The new AIRepository base class
It does not have:
- district or shard code (see AIDistrict.py)
- friends or secret friends code
- a collision traverser
of course, a derived class may add those.
It does have:
+ object creation code
+ channel listening code
+ context allocator/manager
"""
notify = directNotify.newCategory("AIRepository")
InitialContext = 100000
def __init__(
self, mdip, mdport, esip, esport, dcFileNames,
serverId,
minChannel, maxChannel, dcSuffix = 'AI'):
assert self.notify.debugStateCall(self)
self._channels={}
self.AIRunningNetYield = simbase.config.GetBool('ai-running-net-yield', 0)
self._msgBundleNames = []
# doId->requestDeleted object
self._requestDeletedDOs = {}
ConnectionRepository.__init__(
self, ConnectionRepository.CM_NATIVE, simbase.config)
self.dcSuffix = dcSuffix
simbase.setupCpuAffinities(minChannel)
self.distributedObjectRequests=set()
self.context=self.InitialContext
self.contextToClassName={}
self.setClientDatagram(0)
self.readDCFile(dcFileNames = dcFileNames)
# Save the state server id
self.serverId = serverId
# Save the connect info
self.mdurl = URLSpec(mdip, 1)
self.esurl = URLSpec(esip, 1)
if not self.mdurl.hasPort():
self.mdurl.setPort(mdport)
if not self.esurl.empty() and not self.esurl.hasPort():
self.esurl.setPort(esport)
self.notify.info("event server at %s." % (repr(self.esurl)))
# UDP socket for sending events to the event server.
self.udpSock = None
if not self.esurl.empty():
udpEventServer = SocketAddress()
if not udpEventServer.setHost(self.esurl.getServer(), self.esurl.getPort()):
self.notify.warning("Invalid host for event server: %s" % (self.esurl))
self.udpSock = SocketUDPOutgoing()
self.udpSock.InitToAddress(udpEventServer)
# Save the ranges of channels that the AI controls
self.minChannel = minChannel
self.maxChannel = maxChannel
self.notify.info("dynamic doIds in range [%s, %s], total %s" % (
minChannel, maxChannel, maxChannel - minChannel + 1))
assert maxChannel >= minChannel
# initialize the channel allocation
self.channelAllocator = UniqueIdAllocator(minChannel, maxChannel)
# Define the ranges of zones.
self.minZone = self.getMinDynamicZone()
self.maxZone = self.getMaxDynamicZone()
self.notify.info("dynamic zoneIds in range [%s, %s], total %s" % (
self.minZone, self.maxZone, self.maxZone - self.minZone + 1))
assert self.maxZone >= self.minZone
self.zoneAllocator = UniqueIdAllocator(self.minZone, self.maxZone)
if config.GetBool('detect-leaks', 0) or config.GetBool('ai-detect-leaks', 0):
self.startLeakDetector()
if config.GetBool('detect-messenger-leaks', 0) or config.GetBool('ai-detect-messenger-leaks', 0):
self.messengerLeakDetector = MessengerLeakDetector.MessengerLeakDetector(
'AI messenger leak detector')
if config.GetBool('leak-messages', 0):
MessengerLeakDetector._leakMessengerObject()
if config.GetBool('run-garbage-reports', 0) or config.GetBool('ai-run-garbage-reports', 0):
noneValue = -1.
reportWait = config.GetFloat('garbage-report-wait', noneValue)
reportWaitScale = config.GetFloat('garbage-report-wait-scale', noneValue)
if reportWait == noneValue:
reportWait = None
if reportWaitScale == noneValue:
reportWaitScale = None
self.garbageReportScheduler = GarbageReportScheduler(waitBetween=reportWait,
waitScale=reportWaitScale)
self._proactiveLeakChecks = (config.GetBool('proactive-leak-checks', 1) and
config.GetBool('ai-proactive-leak-checks', 1))
self._crashOnProactiveLeakDetect = config.GetBool('crash-on-proactive-leak-detect', 1)
# Give ourselves the first channel in the range
self.ourChannel = self.allocateChannel()
# These are used to query database objects directly; currently
# used only for offline utilities.
self.dbObjContext = 0
self.dbObjMap = {}
# The UtilityAIRepository sets this to 0 to indicate we should
# not do things like issue new catalogs to toons that we load
# in. However, in the normal AI repository, we should do
# these things.
self.doLiveUpdates = 1
#for generating unqiue names for non-dos, manly used for tasks
self.keyCounter = 0
self.MaxEpockSpeed = self.config.GetFloat('ai-net-yield-epoch', 1.0/30.0)
if self.AIRunningNetYield :
taskMgr.doYield =self.taskManagerDoYieldNetwork
# use this for time yields without sleeps
#taskMgr.doYield = self.taskManagerDoYield
# Used for moderation of report-a-player feature
self.centralLogger = self.generateGlobalObject(
OtpDoGlobals.OTP_DO_ID_CENTRAL_LOGGER,
"CentralLogger")
self.garbageLeakLogger = GarbageLeakServerEventAggregatorAI(self)
taskMgr.add(self._checkBundledMsgs, 'checkBundledMsgs', priority=-100)
# skip a bit so we miss the startup sequence (it has very long frames as things are set up)
taskMgr.doMethodLater(2 * 60., self._startPerformanceLogging,
'startPerformanceLogging')
self.connectionName = None
self.connectionURL = None
def _startPerformanceLogging(self, task=None):
period = self.config.GetFloat(
'ai-performance-log-period',
self.config.GetFloat('server-performance-log-period',
choice(__dev__, 60. * 10., 60.)
)
)
self._sampledMaxFrameDuration = 0.
self._sampleMaxFrameDuration()
self._numPyObjs = None
self._getNumObjCounterLimit = int(max(1, (60 * 60.) / period))
self._getNumObjCounter = 0
taskMgr.doMethodLater(period, self._logPerformanceData, 'logPerformanceData')
return Task.done
def _sampleMaxFrameDuration(self, task=None):
self._sampledMaxFrameDuration = max(self._sampledMaxFrameDuration,
globalClock.getMaxFrameDuration())
# call this at a higher frequency than the frequency at which the global
# clock completely replaces its frame duration samples. that should ensure that
# we don't miss a slow frame in the performance logging
taskMgr.doMethodLater(globalClock.getAverageFrameRateInterval() * .75,
self._sampleMaxFrameDuration, 'sampleMaxFrameDuration')
return Task.done
def _logPerformanceData(self, task=None):
avgFrameDur = 1. / globalClock.getAverageFrameRate()
maxFrameDur = max(self._sampledMaxFrameDuration,
globalClock.getMaxFrameDuration())
self._sampledMaxFrameDuration = 0.
# gc.get_objects can be slow. only sample object count every once in a while
self._getNumObjCounter -= 1
if self._getNumObjCounter <= 0:
self._numPyObjs = len(gc.get_objects())
self._getNumObjCounter = self._getNumObjCounterLimit
self.notify.info(
'avg frame duration=%fs, max frame duration=%fs, num Python objects=%s' % (
avgFrameDur, maxFrameDur, self._numPyObjs))
return Task.again
def startLeakDetector(self):
if hasattr(self, 'leakDetector'):
return False
firstCheckDelay = config.GetFloat('leak-detector-first-check-delay', -1)
if firstCheckDelay == -1:
firstCheckDelay = None
self.leakDetector = ContainerLeakDetector(
'AI container leak detector', firstCheckDelay = firstCheckDelay)
self.accept(self.leakDetector.getLeakEvent(), self._handleLeak)
self.objectTypesLeakDetector = LeakDetectors.ObjectTypesLeakDetector()
self.garbageLeakDetector = LeakDetectors.GarbageLeakDetector()
self.cppMemoryUsageLeakDetector = LeakDetectors.CppMemoryUsage()
self.taskLeakDetector = LeakDetectors.TaskLeakDetector()
self.messageListenerTypesLeakDetector = LeakDetectors.MessageListenerTypesLeakDetector()
# this isn't necessary with the current messenger implementation
#self.messageTypesLeakDetector = LeakDetectors.MessageTypesLeakDetector()
return True
def _getMsgName(self, msgId):
# we might get a list of message names, use the first one
return makeList(AIMsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0]
def _handleLeak(self, container, containerName):
# TODO: send email with warning
self.notify.info('sending memory leak server event')
self.writeServerEvent('memoryLeak', self.ourChannel, '%s|%s|%s|%s' % (
containerName, len(container), itype(container),
fastRepr(container, maxLen=1, strFactor=50)))
def getPlayerAvatars(self):
return [i for i in self.doId2do.values()
if isinstance(i, DistributedPlayerAI)]
def uniqueName(self, desc):
return desc+"-"+str(self.serverId)
def trueUniqueName(self, desc):
self.keyCounter += 1
return desc+"-"+str(self.serverId)+"-"+str(self.keyCounter)
def allocateContext(self):
self.context+=1
if self.context >= (1<<32):
self.context=self.InitialContext
return self.context
#### Init ####
def writeServerEvent(self, eventType, who, description, serverId=None):
"""
Sends the indicated event data to the event server via UDP for
recording and/or statistics gathering. All related events
should be given the same eventType (some arbitrary string).
The who field should be the numeric or alphanumeric
representation of who generated this activity; generally this
will be an avatarId or the doId of the AI or something. The
description is a one-line description of the event, possibly
with embedded vertical bars as separators.
"""
if not self.udpSock:
self.notify.debug("Unable to log server event: no udpSock.")
return
# Make who be a string (it might be passed in as an integer)
who = str(who)
# log the access string for avatar-related messages
doId = None
try:
doId = int(who)
except:
pass
if doId is not None and doId in self.doId2do:
av = self.getDo(doId)
if hasattr(av, 'getAccess'):
description = '%s|%s' % (description, av.getAccess())
# break it up into chunks that will fit in a UDP packet (max 1024 bytes)
# leave some room for the header/eventType/who
maxLen = 900
breakCount = 0
# always run this loop at least once
while True:
if breakCount > 0:
eventType = '%s-continued%s' % (eventType, breakCount)
# Count up the number of bytes in the packet
length = 2 + 2 + 4 + 2 + len(eventType) + 2 + len(who) + 2 + len(description)
dg = PyDatagram()
dg.addUint16(length)
dg.addUint16(1) # message type 1: server event
dg.addUint16(6) # server type 6: AI server
dg.addUint32(self.ourChannel)
dg.addString(eventType)
dg.addString(who)
dg.addString(description[:maxLen])
description = description[maxLen:]
self.notify.debug('%s|AIevent:%s|%s|%s' % (eventType, self.serverId, who, description))
if not self.udpSock.Send(dg.getMessage()):
self.notify.warning("Unable to log server event: %s" % (self.udpSock.GetLastError()))
if len(description) == 0:
break
breakCount += 1
def writeServerStatus(self, who, avatar_count, object_count):
"""
Sends the Status Packet to the event server via UDP for
recording. Used to monito the health of a AI Server.
"""
if not self.udpSock:
self.notify.debug("Unable to log server Status: no udpSock.")
return
# Make who be a string (it might be passed in as an integer)
#who = str(who)
who="";
# Count up the number of bytes in the packet
length = 2 + 2 + 4 + (len(who)+2) + 4 +4
dg = PyDatagram()
dg.addUint16(length)
dg.addUint16(2) # message type 2: server event
dg.addUint16(6) # server type 6: AI server
dg.addUint32(self.ourChannel)
dg.addString(who)
dg.addUint32(avatar_count)
dg.addUint32(object_count)
if not self.udpSock.Send(dg.getMessage()):
self.notify.warning("Unable to log server status: %s" % (self.udpSock.GetLastError()))
def writeServerStatus2(self, who, avatar_count, object_count):
"""
Sends the Status Packet to the event server via UDP for
recording. Used to monito the health of a AI Server.
ServerStatus2 has an additional channel code added for a ping message
We can consolidate the two writeServerStatus messages when toontown can
adopt the new otp_server
"""
if not self.udpSock:
self.notify.debug("Unable to log server Status: no udpSock.")
return
# Make who be a string (it might be passed in as an integer)
#who = str(who)
who="";
# Count up the number of bytes in the packet
length = 2 + 2 + 4 + (len(who)+2) + 8 + 4 + 4
dg = PyDatagram()
dg.addUint16(length)
dg.addUint16(3) # message type 2: server event
dg.addUint16(6) # server type 6: AI server
dg.addUint32(self.ourChannel)
dg.addString(who)
dg.addUint64(self.ourChannel)
dg.addUint32(avatar_count)
dg.addUint32(object_count)
if not self.udpSock.Send(dg.getMessage()):
self.notify.warning("Unable to log server status: %s" % (self.udpSock.GetLastError()))
##### Off #####
def enterOff(self):
self.handler = None
def exitOff(self):
self.handler = None
#### connect #####
def enterConnect(self):
self.handler = self.handleConnect
self.lastMessageTime = 0
self.connect([self.mdurl],
successCallback = self._connected,
failureCallback = self._failedToConnect)
def _failedToConnect(self, statusCode, statusString):
self.fsm.request("noConnection")
def _connected(self):
# Register our channel
self.registerForChannel(self.ourChannel)
self.netMessenger = NetMessenger(self, (OTP_NET_MSGR_CHANNEL_ID_ALL_AI,))
## self.netMessenger.accept("transferDo", self, self.handleTransferDo)
def _handleIgnorableObjectDelete(self, msgType, di):
doId = di.getUint32()
self.notify.debug("Ignoring request to delete doId: " +
str(doId))
def exitConnect(self):
self.handler = None
# Clean up the create district tasks
taskMgr.remove(self.uniqueName("newDistrictWait"))
del self.lastMessageTime
##### PlayGame #####
def enterPlayGame(self):
self.handler = self.handlePlayGame
self.createObjects()
def handleConnect(self, msgType, di):
self.lastMessageTime = globalClock.getRealTime()
if msgType == STATESERVER_DISTRICT_DOWN:
self._handleValidDistrictDown(msgType, di)
elif msgType == STATESERVER_DISTRICT_UP:
self._handleValidDistrictUp(msgType, di)
elif msgType == STATESERVER_OBJECT_DELETE_RAM:
self._handleIgnorableObjectDelete(msgType, di)
else:
self.handleMessageType(msgType, di)
def handlePlayGame(self, msgType, di):
# NOTE: Inheritors may override this to check their
# own message types before calling this handler
# See ToontownAIRepository.py for example.
if msgType == STATESERVER_OBJECT_UPDATE_FIELD:
self._handleUpdateField(di)
elif msgType == STATESERVER_OBJECT_CHANGE_ZONE:
self._handleObjectChangeZone(di)
elif msgType == STATESERVER_OBJECT_DELETE_RAM:
self._handleDeleteObject(di)
elif msgType == DBSERVER_MAKE_FRIENDS_RESP:
self._handleMakeFriendsReply(di)
elif msgType == DBSERVER_REQUEST_SECRET_RESP:
self._handleRequestSecretReply(di)
elif msgType == DBSERVER_SUBMIT_SECRET_RESP:
self._handleSubmitSecretReply(di)
else:
self.handleMessageType(msgType, di)
def handleAvatarUsage(self, di):
"""
Should only be handled by the UD process containing the AvatarManagerUD
"""
pass
def handleAccountUsage(self, di):
"""
Should only be handled by the UD process containing the AvatarManagerUD
"""
pass
def handleObjectDeleteDisk(self, di):
pass
def handleObjectQueryField(self, di):
assert self.notify.debugStateCall(self)
doId = di.getUint32()
fieldId = di.getUint16()
context = di.getUint32()
success = di.getUint8()
if success and context:
className = self.contextToClassName.pop(context, None)
# This prevents a crash that occurs when an AI sends a query,
# crashes, comes back up, then receives the response
if className:
dclass = self.dclassesByName.get(className)
interface = dclass.getFieldByIndex(fieldId)
packer = DCPacker()
packer.setUnpackData(di.getRemainingBytes())
packer.beginUnpack(interface)
value = packer.unpackObject()
messenger.send(
"doFieldResponse-%s"%(context,), [context, value])
def handleObjectQueryFields(self, di):
assert self.notify.debugStateCall(self)
doId = di.getUint32()
context = di.getUint32()
success = di.getUint8()
if success and context:
className = self.contextToClassName.pop(context)
# This prevents a crash that occurs when an AI sends a query,
# crashes, comes back up, then receives the response
if className:
dclass = self.dclassesByName.get(className)
packer = DCPacker()
objData = {}
while di.getRemainingSize() > 0:
fieldId = di.getUint16()
interface = dclass.getFieldByIndex(fieldId)
packer.setUnpackData(di.getRemainingBytes())
packer.beginUnpack(interface)
value = packer.unpackObject()
packer.endUnpack()
objData[interface.getName()] = value
di.skipBytes(packer.getNumUnpackedBytes())
messenger.send("doFieldResponse-%s"%(context,),[context,objData])
else:
self.notify.warning("STATESERVER_OBJECT_QUERY_FIELDS_RESP received with invalid context: %s" % context)
messenger.send("doFieldQueryFailed-%s"%(context),[context])
else:
messenger.send("doFieldQueryFailed-%s"%(context),[context])
def handleServerPing(self, di):
assert self.notify.debugStateCall(self)
# Deconstruct ping message from stateserver
sec = di.getUint32()
usec = di.getUint32()
url = di.getString()
channel = di.getUint32()
# Send ping back to state server
datagram = PyDatagram()
sender=self.getMsgSender()
datagram.addServerHeader(
sender, self.ourChannel, SERVER_PING)
# A context that can be used to index the response if needed
datagram.addUint32(sec)
datagram.addUint32(usec)
datagram.addString(url)
datagram.addUint32(channel)
self.send(datagram)
def handleMessageType(self, msgType, di):
if msgType == CLIENT_GET_STATE_RESP:
# This one comes back after we sendSetAvatarIdMsg()
pass
elif msgType == DBSERVER_GET_STORED_VALUES_RESP:
self._handleDatabaseGetStoredValuesResp(di)
elif msgType == DBSERVER_CREATE_STORED_OBJECT_RESP:
self._handleDatabaseCreateStoredObjectResp(di)
elif msgType == STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT_RESP:
self._handleDatabaseGenerateResponse(di)
elif msgType == STATESERVER_OBJECT_SET_ZONE:
# import pdb;pdb.set_trace()
self.handleSetLocation(di)
elif msgType == STATESERVER_OBJECT_LEAVING_AI_INTEREST:
# import pdb;pdb.set_trace()
self.handleDistObjExit(di)
elif msgType == STATESERVER_QUERY_OBJECT_ALL_RESP:
self.handleDistObjRequestResponse(di)
elif msgType == STATESERVER_OBJECT_ENTER_AI_RECV:
self.handleDistObjEnter(di)
elif msgType == STATESERVER_OBJECT_ENTERZONE_WITH_REQUIRED_OTHER:
self.handleDistObjEnterZone(di)
elif msgType == STATESERVER_QUERY_OBJECT_CHILDREN_RESP:
# This acts much like a generate with required and other
self.handleDistObjEnter(di)
elif msgType == STATESERVER_QUERY_OBJECT_CHILDREN_LOCAL_DONE:
# This acts much like a generate with required and other
self.handleQueryObjectChildrenLocalDone(di)
elif msgType == STATESERVER_OBJECT_ENTER_OWNER_RECV:
# This message is sent at some stage during avatar creation.
self.handleDistObjEnterOwner(di)
elif msgType == STATESERVER_OBJECT_CHANGE_OWNER_RECV:
self.handleDistObjChangeOwner(di)
elif msgType == ACCOUNT_AVATAR_USAGE:
self.handleAvatarUsage(di)
elif msgType == ACCOUNT_ACCOUNT_USAGE:
self.handleAccountUsage(di)
elif msgType == STATESERVER_OBJECT_DELETE_DISK:
self.handleObjectDeleteDisk(di)
elif msgType == STATESERVER_OBJECT_QUERY_FIELD_RESP:
self.handleObjectQueryField(di)
elif msgType == STATESERVER_OBJECT_QUERY_FIELDS_RESP:
self.handleObjectQueryFields(di)
elif msgType == STATESERVER_OBJECT_QUERY_MANAGING_AI:
pass
elif msgType == SERVER_PING:
self.handleServerPing(di)
else:
AIRepository.notify.warning(
"Ignoring unexpected message type: %s in state: %s" %
(msgType, self.fsm.getCurrentState().getName()))
if __dev__:
import pdb
pdb.set_trace()
def exitPlayGame(self):
self.handler = None
self.stopReaderPollTask()
self.deleteDistributedObjects()
cleanupAsyncRequests()
# Make sure there are no leftover tasks that shouldn't be here.
for task in taskMgr.getTasks():
if (task.name in ("igLoop",
"aiSleep",
"doLaterProcessor",
"eventManager",
"tkLoop",
)):
# These tasks are ok
continue
else:
print(taskMgr)
self.notify.error("You can't leave otp until you clean up your tasks.")
# Make sure there are no event hooks still hanging.
if not messenger.isEmpty():
print(messenger)
self.notify.error("Messenger should not have any events in it.")
##### NoConnection #####
def enterNoConnection(self):
self.handler = self.handleMessageType
AIRepository.notify.warning(
"Failed to connect to message director at %s." % (repr(self.mdurl)))
# Wait five seconds, then try to reconnect
taskMgr.doMethodLater(5, self.reconnect, self.uniqueName("waitToReconnect"))
def reconnect(self, task):
self.fsm.request("connect")
return Task.done
def exitNoConnection(self):
self.handler = None
taskMgr.remove(self.uniqueName("waitToReconnect"))
##### General Purpose functions #####
def createObjects(self):
# This is meant to be a pure virtual function that gets
# overridden by inheritors. This is where you should create
# DistributedObjectAI's (and generate them), create the objects
# that manage them, as well as spawn
# any tasks that will create DistributedObjectAI's in the future.
pass
def getHandleClassNames(self):
# This is meant to be a pure virtual function that gets
# overridden by inheritors.
# This function should return a tuple or list of string names
# that represent distributed object classes for which we want
# to make a special 'handle' class available. For instance,
# to make the DistributedToonHandleAI class available, this
# function should return ('DistributedToon',).
return ()
def handleDistObjRequestResponse(self, di):
assert self.notify.debugStateCall(self)
context = di.getUint32()
parentId = di.getUint32()
zoneId = di.getUint32()
classId = di.getUint16()
doId = di.getUint32()
# if there's no context, there's no point in doing anything else
if context:
# Look up the dclass
dclass = self.dclassesByNumber[classId]
# Create a new distributed object
distObj = self._generateFromDatagram(
parentId, zoneId, dclass, doId, di, addToTables=False)
#distObj.isQueryAllResponse = True
self.writeServerEvent('doRequestResponse', doId, '%s'%(context,))
messenger.send("doRequestResponse-%s"%(context,), [context, distObj])
def postGenerate(self, context, distObj):
parentId = distObj.parentId
zoneId = distObj.zoneId
doId = distObj.doId
self.distributedObjectRequests.discard(doId)
distObj.setLocation(parentId, zoneId)
self.writeServerEvent('distObjEnter', doId, '')
def handleDistObjEnter(self, di):
assert self.notify.debugStateCall(self)
context = di.getUint32()
parentId = di.getUint32()
zoneId = di.getUint32()
classId = di.getUint16()
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
# Is it in our dictionary?
if doId in self.doId2do:
self.notify.warning("Object Entered " + str(doId) +
" re-entered without exiting")
# Create a new distributed object
distObj = self._generateFromDatagram(
parentId, zoneId, dclass, doId, di)
self.postGenerate(context, distObj)
# Put it in the dictionary - Is it already in our dictionary? Not
# sure why or how this happens, but it does come up from time to
# time in production
# Asad Todo take out this security check for now don't publish to toontown!!!
## if doId in self.doId2do:
## self.writeServerEvent('suspicious', doId, 'Avatar re-entered without exiting')
## # Note: until we figure out what is causing this bug, it will be an error
## # This is listed as bug 50608 in the production remarks db
## self.notify.error("Avatar %s re-entered without exiting" % doId)
def handleDistObjEnterZone(self, di):
assert self.notify.debugStateCall(self)
parentId = di.getUint32()
zoneId = di.getUint32()
classId = di.getUint16()
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
# Is it in our dictionary?
if doId in self.doId2do:
self.notify.warning("Object Entered " + str(doId) +
" re-entered without exiting")
# Create a new distributed object
distObj = self._generateFromDatagram(
parentId, zoneId, dclass, doId, di)
# self.postGenerate(context, distObj)
# Put it in the dictionary - Is it already in our dictionary? Not
# sure why or how this happens, but it does come up from time to
# time in production
# Asad Todo take out this security check for now don't publish to toontown!!!
## if doId in self.doId2do:
## self.writeServerEvent('suspicious', doId, 'Avatar re-entered without exiting')
## # Note: until we figure out what is causing this bug, it will be an error
## # This is listed as bug 50608 in the production remarks db
## self.notify.error("Avatar %s re-entered without exiting" % doId)
def handleDistObjEnterOwner(self, di):
# TEMP
return self.handleDistObjEnter(di)
"""
assert self.notify.debugStateCall(self)
# The context is bogus
context = di.getUint32()
# The zone the avatar is in
parentId = di.getUint32()
zoneId = di.getUint32()
# Get the class Id
classId = di.getUint16()
# Get the DO Id
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
self.notify.info(
'ignoring owner create, context=%s, parentId=%s, '
'zoneId=%s, doId=%s, dclass=%s' % (
context, parentId, zoneId, doId, dclass.getName()))
"""
def handleDistObjChangeOwner(self, di):
assert self.notify.debugStateCall(self)
doId = di.getUint32()
newOwnerId = di.getUint32()
oldOwnerId = di.getUint32()
self.notify.info(
'ignoring owner change, '
'doId=%s, newOwnerId=%s, oldOwnerID=%s' % (
doId, newOwnerId, oldOwnerId))
def getDeleteDoIdEvent(self, doId):
# this event is sent after the object is deleted,
# and is sent even if the object was not in the tables.
return 'AIRDeleteDoId-%s' % doId
def handleDistObjExit(self, di):
# Get the distributed object id
doId = di.getUint32()
self.notify.debug("handleDistObjExit %s" % doId)
# If it is in the dictionary, remove it.
obj = self.doId2do.get(doId)
if obj:
self.deleteDistObject(obj)
else:
self.notify.warning("DistObj " + str(doId) +
" exited, but never entered.")
# announce delete event for this doId, even if obj doesn't exist
# send it after we delete any existing object
messenger.send(self.getDeleteDoIdEvent(doId))
# TODO: remove this in favor of getDeleteDoIdEvent
# Throw an event telling everyone that the distObj is gone
self._announceDistObjExit(doId)
def _announceDistObjExit(self, doId):
pass
def _generateFromDatagram(self, parentId, zoneId, dclass, doId, di, addToTables=True):
if (doId in self.doId2do):
# added to prevent objects already generated from being generated again (was
# happening with some traded inventory objects, quests specfically)
return self.doId2do[doId]
# We got a datagram telling us to create a new DO instance
classDef = dclass.getClassDef()
try:
distObj = classDef(self)
except TypeError as e:
self.notify.error('%s (class %s, parentId %d, zoneId %d, doId %d)' % \
(e, dclass.getName(), parentId, zoneId, doId))
distObj.dclass = dclass
# Assign it an Id
distObj.doId = doId
# Since the distObj has been created explicitly from the
# server, we do not own its doId, and hence we shouldn't try
# to deallocate it.
distObj.doNotDeallocateChannel = 1
# init the parentId and zoneId
if addToTables:
# add the new DO to the tables
# let addDoToTables set the parentId and zoneId, it ignores
# the location if it matches what's already on the object
distObj.parentId = None
distObj.zoneId = None
self.addDOToTables(distObj, location = (parentId,zoneId))
else:
distObj.parentId = parentId
distObj.zoneId = zoneId
# Generate this Object
distObj.generate()
# Update the required fields
distObj.updateAllRequiredOtherFields(dclass, di)
return distObj
def _handleUpdateField(self, di):
# Get the Do Id
doId = di.getUint32()
# Find the do
do = self.doId2do[doId]
# Let the dclass finish the job
do.dclass.receiveUpdate(do, di)
def _handleObjectChangeZone(self, di):
# Get the Do Id
doId = di.getUint32()
newParentId = di.getUint32()
newZoneId = di.getUint32()
oldParentId = di.getUint32()
oldZoneId = di.getUint32()
# Find the do
do = self.doId2do.get(doId)
if do is None:
self.notify.warning(
'handleObjectChangeZone: (NOT PRESENT) %s:(%s, %s)->(%s, %s)' %(
doId, oldParentId, oldZoneId, newParentId, newZoneId))
else:
self.notify.debug('handleObjectChangeZone: %s:(%s, %s)->(%s, %s)' %(
doId, oldParentId, oldZoneId, newParentId, newZoneId))
# TODO: pass in the old parent and old zone
do.setLocation(newParentId, newZoneId) #, oldParentId, oldZoneId)
def _handleDeleteObject(self, di):
# Get the DO Id
doId = di.getUint32()
obj = self.doId2do.get(doId)
if obj:
# If it is in the dictionary, remove it.
self.deleteDistObject(obj)
else:
# Otherwise ignore it
AIRepository.notify.warning(
"Asked to delete non-existent DistObjAI " + str(doId))
# announce delete event for this doId, even if obj doesn't exist
messenger.send(self.getDeleteDoIdEvent(doId))
def sendUpdate(self, do, fieldName, args):
#print("---------------sendUpdate--")
#print(do)
#print(do.doId)
#print(fieldName)
#print(args)
dg = do.dclass.aiFormatUpdate(
fieldName, do.doId, do.doId, self.ourChannel, args)
self.sendDatagram(dg)
def sendUpdateToDoId(self, dclassName, fieldName, doId, args,
channelId=None):
"""
channelId can be used as a recipient if you want to bypass the normal
airecv, ownrecv, broadcast, etc. If you don't include a channelId
or if channelId == doId, then the normal broadcast options will
be used.
See Also: def queryObjectField
"""
dclass=self.dclassesByName.get(dclassName+self.dcSuffix)
assert dclass is not None
if channelId is None:
channelId=doId
if dclass is not None:
dg = dclass.aiFormatUpdate(
fieldName, doId, channelId, self.ourChannel, args)
self.send(dg)
def createDgUpdateToDoId(self, dclassName, fieldName, doId, args,
channelId=None):
"""
channelId can be used as a recipient if you want to bypass the normal
airecv, ownrecv, broadcast, etc. If you don't include a channelId
or if channelId == doId, then the normal broadcast options will
be used.
This is just like sendUpdateToDoId, but just returns
the datagram instead of immediately sending it.
"""
result = None
dclass=self.dclassesByName.get(dclassName+self.dcSuffix)
assert dclass is not None
if channelId is None:
channelId=doId
if dclass is not None:
dg = dclass.aiFormatUpdate(
fieldName, doId, channelId, self.ourChannel, args)
result = dg
return result
def sendUpdateToGlobalDoId(self, dclassName, fieldName, doId, args):
"""
Used for sending messages from an AI directly to an
uber object.
"""
dclass = self.dclassesByName.get(dclassName)
assert dclass, 'dclass %s not found in DC files' % dclassName
dg = dclass.aiFormatUpdate(
fieldName, doId, doId, self.ourChannel, args)
self.send(dg)
def sendUpdateToChannel(self, do, channelId, fieldName, args):
dg = do.dclass.aiFormatUpdate(
fieldName, do.doId, channelId, self.ourChannel, args)
self.sendDatagram(dg)
def sendUpdateToChannelFrom(self, do, channelId, fieldName, fromid, args):
dg = do.dclass.aiFormatUpdate(fieldName, do.doId, channelId,
fromid, args)
self.send(dg)
def startMessageBundle(self, name):
# start bundling messages together. Use this for instance if you want to
# make sure that location and position of an object are processed atomically
# on the state server, to prevent clients from getting the new location and an
# old position (relative to old location) on client generate of the object
self._msgBundleNames.append(name)
ConnectionRepository.startMessageBundle(self)
def sendMessageBundle(self, senderChannel):
# stop bundling messages and send the bundle
# senderChannel is typically the doId of the object affected by the messages
ConnectionRepository.sendMessageBundle(self, self.districtId, senderChannel)
self._msgBundleNames.pop()
def _checkBundledMsgs(self, task=None):
# message bundles should not last across frames
num = len(self._msgBundleNames)
while len(self._msgBundleNames):
self.notify.warning('abandoning message bundle: %s' % self._msgBundleNames.pop())
if num > 0:
self.abandonMessageBundles()
self.notify.error('message bundling leak, see warnings above (most recent first)')
return task.cont
def registerForChannel(self, channelNumber):
if self._channels.get(channelNumber):
# We are already registered for this channel.
return
self._channels[channelNumber]=1
# Time to send a register for channel message to the msgDirector
datagram = PyDatagram()
# datagram.addServerControlHeader(CONTROL_SET_CHANNEL)
datagram.addInt8(1)
datagram.addChannel(CONTROL_MESSAGE)
datagram.addUint16(CONTROL_SET_CHANNEL)
datagram.addChannel(channelNumber)
self.send(datagram)
def addPostSocketClose(self, themessage):
# Time to send a register for channel message to the msgDirector
datagram = PyDatagram()
# datagram.addServerControlHeader(CONTROL_ADD_POST_REMOVE)
datagram.addInt8(1)
datagram.addChannel(CONTROL_MESSAGE)
datagram.addUint16(CONTROL_ADD_POST_REMOVE)
datagram.addBlob(themessage.getMessage())
self.send(datagram)
def addPostSocketCloseUD(self, dclassName, fieldName, doId, args):
dclass = self.dclassesByName.get(dclassName)
assert dclass, 'dclass %s not found in DC files' % dclassName
dg = dclass.aiFormatUpdate(
fieldName, doId, doId, self.ourChannel, args)
self.addPostSocketClose(dg)
def unregisterForChannel(self, channelNumber):
if self._channels.get(channelNumber) is None:
# We are already unregistered for this channel.
return
del self._channels[channelNumber]
# Time to send a unregister for channel message to the msgDirector
datagram = PyDatagram()
# datagram.addServerControlHeader(CONTROL_REMOVE_CHANNEL)
datagram.addInt8(1)
datagram.addChannel(CONTROL_MESSAGE)
datagram.addUint16(CONTROL_REMOVE_CHANNEL)
datagram.addChannel(channelNumber)
self.send(datagram)
#----------------------------------
def addAvatarToChannels(self, avatarId, listOfChannels):
"""
avatarId is a 32 bit doId
"""
assert self.notify.debugCall()
self.addConnectionToChannels((1<<32)+avatarId, listOfChannels)
def removeAvatarFromChannels(self, avatarId, listOfChannels):
"""
avatarId is a 32 bit doId
"""
assert self.notify.debugCall()
self.removeConnectionToChannels((1<<32)+avatarId, listOfChannels)
def addConnectionToChannels(self, targetConnection, listOfChannels):
"""
avatarId is a 32 bit doId
"""
assert self.notify.debugCall()
dg = PyDatagram()
dg.addServerHeader(
targetConnection, self.serverId, CLIENT_AGENT_OPEN_CHANNEL)
for i in listOfChannels:
dg.addUint64(i)
self.send(dg)
def removeConnectionToChannels(self, targetConnection, listOfChannels):
"""
avatarId is a 32 bit doId
"""
assert self.notify.debugCall()
dg = PyDatagram()
dg.addServerHeader(
targetConnection, self.serverId, CLIENT_AGENT_CLOSE_CHANNEL)
for i in listOfChannels:
dg.addUint64(i)
self.send(dg)
def addInterestToConnection(self, targetConnection, interestId,
contextId, parentDoId, zoneIdList):
"""
Allows the AIRepository to initiate interest on the client.
See otp.ai.AIInterestHandles for a list of interestId's to use.
"""
assert self.notify.debugCall()
dg = PyDatagram()
dg.addServerHeader(
(1<<32)+targetConnection, self.serverId,
CLIENT_AGENT_SET_INTEREST)
# Set the high bit to indicate that the interest is being governed by
# the AI and not the client
dg.addUint16((1<<15)+interestId)
dg.addUint32(contextId)
dg.addUint32(parentDoId)
dg.addUint32(contextId)
if isinstance(zoneIdList, types.ListType):
# sort and remove repeated entries
zIdSet = set(zoneIdList)
for zoneId in zIdSet:
dg.addUint32(zoneId)
else:
dg.addUint32(zoneIdList)
self.send(dg)
def removeInterestFromConnection(self, targetConnection, interestId,
contextId=0):
"""
Allows the AIRepository to remove interest on the client.
See otp.ai.AIInterestHandles for a list of interestId's to use.
"""
assert self.notify.debugCall()
dg = PyDatagram()
dg.addServerHeader(
(1<<32)+targetConnection, self.serverId,
CLIENT_AGENT_REMOVE_INTEREST)
# set the high bit to indicate that the interest is being governed by
# the AI and not the client
dg.addUint16((1<<15)+interestId)
dg.addUint32(contextId)
self.send(dg)
def setAllowClientSend(self, avatarId,
dObject, fieldNameList = []):
"""
Allow an AI to temporarily give a client 'clsend' privileges
on a particular fields on a particular object. This should
be used on fields that are 'ownsend' by default. When you want
to revoke these privileges, use clearAllowClientSend() to end
these privileges.
"""
assert self.notify.debugCall()
dg = PyDatagram()
dg.addServerHeader(
(1<<32)+avatarId, self.serverId,
CLIENT_SET_FIELD_SENDABLE)
# Set the high bit to indicate that the interest is being governed by
# the AI and not the client
dg.addUint32(dObject.doId)
assert isinstance(fieldNameList, types.ListType)
dclass = dObject.dclass
# sort and remove repeated entries as we discover the field
# ids for the specified names
fieldIdSet = set(dclass.getFieldByName(name).getNumber() \
for name in fieldNameList)
# insert the fieldIds into the datagram
for fieldId in sorted(fieldIdSet):
dg.addUint16(fieldId)
self.send(dg)
def clearAllowClientSend(self, avatarId, dObject):
self.setAllowClientSend(avatarId, dObject)
# ----------------------------------
def setConnectionName(self, name):
self.connectionName = name
# Time to send a register for channel message to the msgDirector
datagram = PyDatagram()
# datagram.addServerControlHeader(CONTROL_SET_CON_NAME)
datagram.addInt8(1)
datagram.addChannel(CONTROL_MESSAGE)
datagram.addUint16(CONTROL_SET_CON_NAME)
datagram.addString(name)
self.send(datagram)
def setConnectionURL(self, url):
self.connectionURL = url
# Time to send a register for channel message to the msgDirector
datagram = PyDatagram()
# datagram.addServerControlHeader(CONTROL_SET_CON_NAME)
datagram.addInt8(1)
datagram.addChannel(CONTROL_MESSAGE)
datagram.addUint16(CONTROL_SET_CON_URL)
datagram.addString(url)
self.send(datagram)
def deleteObjects(self):
# This is meant to be a pure virtual function that gets
# overridden by inheritors.
# This function is where objects that manage DistributedObjectAI's
# should be cleaned up. Since this is only called during a district
# shutdown of some kind, it is not useful to delete the existing
# DistributedObjectAI's, but rather to just make sure that they
# are no longer referenced, and no new ones are created.
pass
def allocateChannel(self):
channel=self.channelAllocator.allocate()
if channel==-1:
raise RuntimeError("channelAllocator.allocate() is out of channels")
if self.channelAllocator.fractionUsed()>0.75:
# There is some debate about how bad it is to run out of
# channels. Being ignorant about what exactly will happen
# if a channel is reused too quickly, we decided to bail
# out if we got low on channels. By the way, being low on
# channels is not the real problem. The problem is reusing
# a channel too soon after it's freed. There is an assumption
# here that if there are a lot of free channels and the
# channels are reused in the same order that they are freed,
# then there is a good chance that any allocated channel has
# "aged" properly.
# Schuyler has daydreamed about a system that will track the age
# of freed ids and sleep or flag an error as apropriate. See him
# for details (esp. if you want to write a cross-platform version
# of said feature).
raise RuntimeError("Dangerously low on channels.")
# Sanity check
assert (channel >= self.minChannel) and (channel <= self.maxChannel)
if 0: ## used to debug the channel code
import traceback
if not hasattr(self, "debug_dictionary"):
self.debug_dictionary = {}
__builtins__["debug_dictionary"] = self.debug_dictionary
for id in self.debug_dictionary.keys():
if id not in self.doId2do:
print("--------------------- Not In DOID table")
print(id)
#traceback.print_list(self.debug_dictionary[id])
print(self.debug_dictionary[id])
del self.debug_dictionary[id] # never report it again ..
self.debug_dictionary[channel] = traceback.extract_stack(None,7)
return channel
def deallocateChannel(self, channel):
if 0: ## used to debug the channel code .. see above
del self.debug_dictionary[channel]
self.channelAllocator.free(channel)
def getMinDynamicZone(self):
# Override this to return the minimum allowable value for a
# dynamically-allocated zone id.
return 0
def getMaxDynamicZone(self):
# Override this to return the maximum allowable value for a
# dynamically-allocated zone id.
return 0
def allocateZone(self):
zoneId=self.zoneAllocator.allocate()
if zoneId==-1:
raise RuntimeError("zoneAllocator.allocate() is out of zoneIds")
# Sanity check
assert (zoneId >= self.minZone) and (zoneId <= self.maxZone)
return zoneId
def deallocateZone(self, zoneId):
self.zoneAllocator.free(zoneId)
# NOTE: the public API for this is in DistributedObjectAI.b_setLocation()
# @report(types = ['args'], dConfigParam = 'avatarmgr')
def sendSetLocation(self, distobj, parentId, zoneId, owner=None):
datagram = PyDatagram()
datagram.addServerHeader(
distobj.doId, self.ourChannel, STATESERVER_OBJECT_SET_ZONE)
datagram.addUint32(parentId)
datagram.addUint32(zoneId)
self.send(datagram)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def sendSetLocationDoId(self, doId, parentId, zoneId, owner=None):
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_SET_ZONE)
datagram.addUint32(parentId)
datagram.addUint32(zoneId)
self.send(datagram)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def sendSetOwnerDoId(self, doId, ownerId):
# allowed channels are (from http://aspen.online.disney.com/mediawiki/index.php/P2P%2C_OWNERSHIP_STUFF):
# account<<32 + avatar
# 1<<32 + avatar
# 1<<32 + account
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_SET_OWNER_RECV)
datagram.addChannel((1<<32) + ownerId)
self.send(datagram)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def sendClearOwnerDoId(self, doId):
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_SET_OWNER_RECV)
datagram.addChannel(0)
self.send(datagram)
def sendSetZone(self, distobj, zoneId):
self.notify.error("non-district types should not call sendSetZone")
def startTrackRequestDeletedDO(self, obj):
if obj.doId in self._requestDeletedDOs:
self.notify.warning('duplicate requestDelete for %s %s' % (obj.__class__.__name__, obj.doId))
# store object and time of requestDelete
self._requestDeletedDOs[obj.doId] = (obj, globalClock.getRealTime())
def stopTrackRequestDeletedDO(self, obj):
# sometimes objects are deleted without having requested a delete
#assert obj.doId in self._requestDeletedDOs
if (hasattr(obj,'doId')) and (obj.doId in self._requestDeletedDOs):
del self._requestDeletedDOs[obj.doId]
def getRequestDeletedDOs(self):
# returns list of (obj, age of delete request), sorted by descending age
response = []
now = globalClock.getRealTime()
for obj, requestTime in self._requestDeletedDOs.values():
# calculate how long it has been since delete was requested
age = now - requestTime
index = 0
while index < len(response):
if age > response[index][1]:
break
index += 1
response.insert(index, (obj, age))
return response
@report(types = ['args'], dConfigParam = 'avatarmgr')
def requestDelete(self, distobj):
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
distobj.doId, self.ourChannel, STATESERVER_OBJECT_DELETE_RAM)
# The Id of the object in question
datagram.addUint32(distobj.doId)
self.send(datagram)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def requestDeleteDoId(self, doId):
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_DELETE_RAM)
# The Id of the object in question
datagram.addUint32(doId)
self.send(datagram)
def requestDeleteDoIdFromDisk(self, doId):
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_DELETE_DISK)
# The Id of the object in question
datagram.addUint32(doId)
self.send(datagram)
def getDatabaseGenerateResponseEvent(self, context):
# handler must accept (doId)
return 'DBGenResponse-%s' % context
def _handleDatabaseGenerateResponse(self, di):
assert self.notify.debugCall(self)
context = di.getUint32()
doId = di.getUint32()
self.notify.debug(
'_handleDatabaseGenerateResponse, context=%s, doId=%s' %
(context, doId))
messenger.send(
self.getDatabaseGenerateResponseEvent(context), [doId])
def getDatabaseIdForClassName(self, className):
assert 0
# You probably want to override this to return something better.
return 0
def requestDatabaseGenerate(
self, classId, context,
parentId=0, zoneId=0,
ownerChannel=0, ownerAvId=None,
databaseId=None, values = None):
"""
context is any 32 bit integer to be used a reference
for this message (and its reply).
parentId may be 0 or a valid distributed object ID.
zoneId may be 0 or a valid zone ID (int32).
ownerChannel may be 0 or a valid owner channel (int64)
OR
ownerAvId may be None or a player doId
databaseId is a distributed object ID for the database
(maybe a constant).
values is a dictionary of other member values (or None)
To get doId of new object, listen for this event:
air.getDatabaseGenerateResponseEvent(context)
If object location was provided, and the location is on this
district, the object will be generated shortly after the
above message is sent.
"""
AIRepository.notify.debugCall()
#self.notify.info('requestDatabaseGenerate, class=%s, context=%s' %
# (classId, context))
if ownerChannel == 0 and ownerAvId is not None:
ownerChannel = (1<<32) + ownerAvId
if classId in self.dclassesByNumber:
dclass = self.dclassesByNumber[classId]
else:
if classId in self.dclassesByName:
dclass = self.dclassesByName[classId]
elif classId+self.dcSuffix in self.dclassesByName:
dclass = self.dclassesByName[classId+self.dcSuffix]
elif classId+'AI' in self.dclassesByName:
dclass = self.dclassesByName[classId+'AI']
else:
self.notify.warning("dclass not found %s"%(classId,))
if __dev__:
import pdb; pdb.set_trace()
if databaseId is None:
databaseId=self.getDatabaseIdForClassName(dclass.getName())
if values is None:
if simbase.newDBRequestGen:
dg = dclass.aiDatabaseGenerateContext(
context, parentId, zoneId, ownerChannel,
databaseId, self.ourChannel)
else:
dg = dclass.aiDatabaseGenerateContextOld(
context, parentId, zoneId,
databaseId, self.ourChannel)
self.send(dg)
else:
packer = DCPacker()
packer.rawPackUint8(1)
packer.rawPackUint64(databaseId)
packer.rawPackUint64(self.ourChannel)
packer.rawPackUint16(
STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT)
## packer.rawPackUint16(
## STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT)
packer.rawPackUint32(parentId)
packer.rawPackUint32(zoneId)
if simbase.newDBRequestGen:
packer.rawPackUint64(ownerChannel)
packer.rawPackUint16(dclass.getNumber())
packer.rawPackUint32(context)
optionalFields = []
for i in range(dclass.getNumInheritedFields()):
field = dclass.getInheritedField(i)
if field.asMolecularField() == None:
if field.isRequired():
# Packs Required Fields
value = values.get(field.getName(), None)
packer.beginPack(field)
if value == None:
packer.packDefaultValue()
else:
if not field.packArgs(packer, value):
raise StandardError
packer.endPack()
else:
value = values.get(field.getName(), None)
if value != None:
fieldDec = {}
fieldDec['field'] = field
fieldDec['value'] = value
optionalFields.append(fieldDec)
packer.rawPackUint16(len(optionalFields))
# Packs Optional Fields
for i in optionalFields:
field = i['field']
value = i['value']
packer.rawPackUint16(field.getNumber())
packer.beginPack(field)
field.packArgs(packer, value)
packer.endPack()
if packer.hadError():
raise StandardError
dg = Datagram(packer.getString())
self.send(dg)
def lostConnection(self):
ConnectionRepository.lostConnection(self)
sys.exit()
def handleDatagram(self, di):
if self.notify.getDebug():
print("AIRepository received datagram:")
di.getDatagram().dumpHex(Notify.out())
channel=self.getMsgChannel()
if channel in self.netMessenger.channels:
self.netMessenger.handle(di.getString())
else:
self.handler(self.getMsgType(), di)
def deleteDistObject(self, do):
assert self.notify.debugCall()
# if not hasattr(do, "isQueryAllResponse") or not do.isQueryAllResponse:
do.sendDeleteEvent()
# Remove it from the dictionary
self.removeDOFromTables(do)
# Delete the object itself
do.delete()
if self._proactiveLeakChecks:
# make sure we're not leaking
do.detectLeaks()
def sendAnotherGenerate(self, distObj, toChannel):
assert self.notify.debugCall()
dg = distObj.dclass.aiFormatGenerate(
distObj, distObj.doId, distObj.parentId, distObj.zoneId,
toChannel, self.ourChannel, [])
self.send(dg)
def generateWithRequired(self, distObj, parentId, zoneId, optionalFields=[]):
assert self.notify.debugStateCall(self)
# Assign it an id
distObj.doId = self.allocateChannel()
# Put the new DO in the dictionaries
self.addDOToTables(distObj, location = (parentId,zoneId))
# Send a generate message
distObj.sendGenerateWithRequired(self, parentId, zoneId, optionalFields)
# this is a special generate used for estates, or anything else that
# needs to have a hard coded doId as assigned by the server
def generateWithRequiredAndId(
self, distObj, doId, parentId, zoneId, optionalFields=[]):
assert self.notify.debugStateCall(self)
# Assign it an id
distObj.doId = doId
# Since the distObj has been created explicitly from the
# server, we do not own its doId, and hence we shouldn't try
# to deallocate it.
distObj.doNotDeallocateChannel = 1
# Put the new DO in the dictionaries
self.addDOToTables(distObj, location = (parentId,zoneId))
# Send a generate message
distObj.sendGenerateWithRequired(self, parentId, zoneId, optionalFields)
def queryObjectAll(self, doId, context=0):
"""
Get a one-time snapshot look at the object.
"""
assert self.notify.debugStateCall(self)
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_QUERY_OBJECT_ALL)
# A context that can be used to index the response if needed
datagram.addUint32(context)
self.send(datagram)
# Make sure the message gets there.
self.flush()
def queryObjectZoneIds(self, stateServerId, obj2ZoneDict):
# obj2ZoneDict should be a dict looking like this:
# { objId : [zoneId, zoneId, ...],
# objId : [zoneId, zoneId, ...],
# }
assert self.notify.debugStateCall(self)
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
stateServerId, self.ourChannel, STATESERVER_QUERY_ZONE_OBJECT_ALL)
numObjs = len(obj2ZoneDict.keys())
datagram.addUint16(numObjs)
for objId, zoneIds in obj2ZoneDict.values():
datagram.addUint32(objId)
datagram.addUint16(len(zoneIds))
for zoneId in zoneIds:
datagram.addUint32(zoneId)
# This is for a forwarding system that I did not hook up.
# Ask Roger for details.
datagram.addUint16(0)
self.send(datagram)
# Make sure the message gets there.
self.flush()
def queryObjectChildrenLocal(self, parentId, context=0):
assert self.notify.debugStateCall(self)
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
self.serverId, self.ourChannel, STATESERVER_QUERY_OBJECT_CHILDREN_LOCAL)
datagram.addUint32(parentId)
datagram.addUint32(context)
self.send(datagram)
# Make sure the message gets there.
self.flush()
def handleQueryObjectChildrenLocalDone(self, di):
# We asked to get generates for all objects that are children of
# us. This is the callback that says the server is done sending all
# those generates.
assert self.notify.debugCall()
parentId = di.getUint16()
context = di.getUint32()
parent = self.doId2do.get(parentId)
if parent:
parent.handleQueryObjectChildrenLocalDone(context)
else:
self.notify.warning('handleQueryObjectChildrenLocalDone: parentId %s not found' %
parentId)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def queryObjectFieldId(self, doId, fieldId, context=0):
"""
Get a one-time snapshot look at the object.
"""
assert self.notify.debugStateCall(self)
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_QUERY_FIELD)
datagram.addUint32(doId)
datagram.addUint16(fieldId)
# A context that can be used to index the response if needed
datagram.addUint32(context)
self.send(datagram)
# Make sure the message gets there.
self.flush()
@report(types = ['args'], dConfigParam = 'avatarmgr')
def queryObjectFieldIds(self, doId, fieldIds, context=0):
"""
Get a one-time snapshot look at the object.
Query multiple field IDs from the same object.
"""
assert self.notify.debugStateCall(self)
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
doId, self.ourChannel, STATESERVER_OBJECT_QUERY_FIELDS)
datagram.addUint32(doId)
datagram.addUint32(context)
for x in fieldIds:
datagram.addUint16(x)
self.send(datagram)
# Make sure the message gets there.
self.flush()
@report(types = ['args'], dConfigParam = 'avatarmgr')
def queryObjectStringFieldIds(self, dbId, objString, fieldIds, context=0):
"""
Get a one-time snapshot look at the object.
Query multiple field IDs from the same object, by object string.
"""
assert self.notify.debugStateCall(self)
# Create a message
dg = PyDatagram()
dg.addServerHeader(
dbId, self.ourChannel, STATESERVER_OBJECT_QUERY_FIELDS_STRING)
dg.addString(objString)
dg.addUint32(context)
for x in fieldIds:
dg.addUint16(x)
self.send(dg)
# Make sure the message gets there.
self.flush()
@report(types = ['args'], dConfigParam = 'avatarmgr')
def queryObjectStringFields(
self, dbId, dclassName, objString, fieldNames, context=0):
"""
Get a one-time snapshot look at the object.
Query multiple field names from the same object, by object string.
"""
assert self.notify.debugStateCall(self)
assert len(dclassName) > 0
for fn in fieldNames:
assert len(fn) > 0
dclass = self.dclassesByName.get(dclassName)
assert dclass is not None
if not dclass:
self.notify.error(
"queryObjectStringFields invalid dclassName %s"%(dclassName))
return
if dclass is not None:
fieldIds = []
for fn in fieldNames:
id = dclass.getFieldByName(fn).getNumber()
assert id
if not id:
self.notify.error(
"queryObjectStrongFields invalid field %s, %s"%(doId,fn))
return
fieldIds.append(id)
self.queryObjectStringFieldIds(dbId,objString,fieldIds,context)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def queryObjectField(self, dclassName, fieldName, doId, context=0):
"""
See Also: def sendUpdateToDoId
"""
assert self.notify.debugStateCall(self)
assert len(dclassName) > 0
assert len(fieldName) > 0
assert doId > 0
dclass = self.dclassesByName.get(dclassName)
assert dclass is not None
if not dclass:
self.notify.error(
"queryObjectField invalid dclassName %s, %s"%(doId, fieldName))
return
if dclass is not None:
fieldId = dclass.getFieldByName(fieldName).getNumber()
assert fieldId # is 0 a valid value?
if not fieldId:
self.notify.error(
"queryObjectField invalid field %s, %s"%(doId, fieldName))
return
self.queryObjectFieldId(doId, fieldId, context)
@report(types = ['args'], dConfigParam = 'avatarmgr')
def queryObjectFields(self, dclassName, fieldNames, doId, context=0):
"""
See Also: def sendUpdateToDoId
"""
assert self.notify.debugStateCall(self)
assert len(dclassName) > 0
assert len(fieldNames) > 0
for fieldName in fieldNames:
assert len(fieldName) > 0
assert doId > 0
dclass = self.dclassesByName.get(dclassName)
assert dclass is not None
if not dclass:
self.notify.error(
"queryObjectField invalid dclassName %s, %s"%(doId, fieldName))
return
if dclass is not None:
fieldIds = [dclass.getFieldByName(fieldName).getNumber() \
for fieldName in fieldNames]
# is 0 a valid value?
assert 0 not in fieldIds
if 0 not in fieldIds:
self.queryObjectFieldIds(doId, fieldIds, context)
else:
assert self.notify.error(
"queryObjectFields invalid field in %s, %s"%(doId, repr(fieldNames)))
def requestDistributedObject(self, doId):
"""
Ask for the object to be added to the private
distributed object cache.
Normally a query object all does not actually enter the object
returned into the doId2do table. This function will change a query
request to a distributed object that is part of the normal set of
objects on this server.
"""
assert self.notify.debugCall()
distObj = self.doId2do.get(doId)
if distObj is not None:
# Already have it
return
if doId in self.distributedObjectRequests:
# Already requested it
return
#todo: add timeout for to remove request
self.distributedObjectRequests.add(doId)
context=self.allocateContext()
self.acceptOnce(
"doRequestResponse-%s"%(context,),
self.postGenerate, [])
self.registerForChannel(doId)
self.queryObjectAll(doId, context)
# If you want to set the airecv you should set the location
def setAIReceiver(self, objectId, aiChannel=None):
# Create a message
datagram = PyDatagram()
datagram.addServerHeader(
self.ourChannel, self.ourChannel, STATESERVER_ADD_AI_RECV)
# The Id of the object in question
datagram.addUint32(objectId)
if aiChannel is None:
aiChannel = self.ourChannel
datagram.addUint32(aiChannel)
# Send the message
self.send(datagram)
# Make sure the message gets there.
self.flush()
def replaceMethod(self, oldMethod, newFunction):
return 0
def sendUpdate(self, distObj, fieldName, args):
dg = distObj.dclass.aiFormatUpdate(
fieldName, distObj.doId, distObj.doId, self.ourChannel, args)
self.sendDatagram(dg)
def _handleDatabaseGetStoredValuesResp(self, di):
context = di.getUint32()
dbObj = self.dbObjMap.get(context)
if dbObj:
del self.dbObjMap[context]
dbObj.getFieldsResponse(di)
else:
AIRepository.notify.warning(
"Ignoring unexpected context %d for DBSERVER_GET_STORED_VALUES" %
context)
def _handleDatabaseCreateStoredObjectResp(self, di):
context = di.getUint32()
dbObj = self.dbObjMap.get(context)
if dbObj:
del self.dbObjMap[context]
dbObj.handleCreateObjectResponse(di)
else:
AIRepository.notify.warning(
"Ignoring unexpected context %d for DBSERVER_CREATE_STORED_OBJECT" %
context)
# LEADERBOARD
def setLeaderboardValue(self, category, whoId, whoName, value, senderId=None):
self.writeServerEvent('setLeaderboardValue', whoId,
'%s|%s|%s' % (whoName, category, value))
dcfile = self.getDcFile()
dclass = dcfile.getClassByName('LeaderBoard')
if senderId is None:
senderId = self.districtId
dg = dclass.aiFormatUpdate('setValue',
OtpDoGlobals.OTP_DO_ID_LEADERBOARD_MANAGER,
OtpDoGlobals.OTP_DO_ID_LEADERBOARD_MANAGER,
senderId,
[[category], whoId, whoName, value])
self.send(dg)
##################################################################
# msgsend is set by the C code on the repository.. These Functions
# will let you parse the special encoded sender id if you want to
# know the AvatarID or The AccountID of the Sender..
def getAccountIdFromSender(self):
"""
only works on the dc updates from the client agent
"""
return self.getMsgSender() >> 32
def getAvatarIdFromSender(self):
"""
only works on the dc updates from the client agent
"""
return self.getMsgSender() & 0xffffffff
def getSenderReturnChannel(self):
return self.getMsgSender()
########################################
# Network reading and time device.. for ai's
def taskManagerDoYieldNetwork(self , frameStartTime, nextScheuledTaksTime):
minFinTime = frameStartTime + self.MaxEpockSpeed
if nextScheuledTaksTime > 0 and nextScheuledTaksTime < minFinTime:
minFinTime = nextScheuledTaksTime
self.networkBasedReaderAndYielder(self.handleDatagram,globalClock,minFinTime)
if not self.isConnected():
self.stopReaderPollTask()
self.lostConnection()
###############################################################
# Optimized version of old behavior..
def readerPollUntilEmpty(self, task):
self.checkDatagramAi(self.handleDatagram)
if not self.isConnected():
self.stopReaderPollTask()
self.lostConnection()
return Task.cont
###############################################################
# This can be used to do time based yielding instead of the sleep task.
def taskManagerDoYield(self , frameStartTime, nextScheuledTaksTime):
minFinTime = frameStartTime + self.MaxEpockSpeed
if nextScheuledTaksTime > 0 and nextScheuledTaksTime < minFinTime:
minFinTime = nextScheuledTaksTime
delta = minFinTime - globalClock.getRealTime()
while(delta > 0.002):
time.sleep(delta)
delta = minFinTime - globalClock.getRealTime()
###############################################################
# This can be used to do time based yielding instead of the sleep task.
def startReaderPollTask(self):
if not self.AIRunningNetYield:
ConnectionRepository.startReaderPollTask(self)
else:
print('########## startReaderPollTask New ')
self.stopReaderPollTask()
self.accept(CConnectionRepository.getOverflowEventName(),self.handleReaderOverflow)
def getTrackClsends(self):
return False